type-aligned data pipeline

This document provides an example of how one could use a type-aligned sequence (specifically TAList) to represent data pipelines. It will introduce some of the useful high-level methods on type-aligned lists.

The code examples assume that the following items have been imported:

import cats.data.Kleisli
import cats.effect.IO
import cats.implicits._
import com.salesforce.maligned._

import java.time.Duration

pipeline stages

A data pipeline is a sequence of stages, where each stage takes an input value and transforms it into an output value. For this example we’ll also declare that every stage has a name and can perform arbitrary IO when producing its output value.

trait Stage[A, B] {
  def name: String

  def apply(input: A): IO[B]
}

Now we can define a number of (slightly simplified) stages that you might find in a natural language processing (NLP) pipeline. At this point none of these stages perform side effects so the IO in the return type isn’t necessary, but other stages might perform side effects so we’ll leave it open as a possibility.

/**
 * Normalizes the case of characters in the input string.
 *
 * This simple version just converts everything to lower case.
 */
val normalizeCase = new Stage[String, String] {
  def name: String = "normalize-case"

  def apply(input: String): IO[String] = IO.pure(input.toLowerCase)
}

/** A logical grouping of characters โ€” usually a word */
type Token = String

/** Splits an input string into a sequence of tokens (words) */
val tokenize = new Stage[String, List[Token]] {
  def name: String = "tokenize"

  def apply(input: String): IO[List[Token]] = IO.pure(input.split("""\s""").toList)
}

/** Counts the number of occurrences of each token in the input sequence of tokens. */
val termFrequency = new Stage[List[Token], Map[Token, Int]] {
  def name: String = "term-frequency"

  def apply(input: List[Token]): IO[Map[Token, Int]] =
    IO.pure(input.foldMap(token => Map(token -> 1)))
}

putting the stages together

Now that we have defined stages, we can define a data pipeline as a type-aligned list (TAList) of stages.

type Pipeline[A, B] = TAList[Stage, A, B]

A is the type of input data that is fed into the first stage of the pipeline and B is the type of output data returned by the last stage of the pipeline.

Now let’s create a “bag of words” text data pipeline. Treating text as a “bag of words” means that you simply count the number of times that each word occurs, ignoring the order of words completely. This approach can be surprisingly effective for certain tasks.

val bagOfWordsPipeline: Pipeline[String, Map[Token, Int]] =
  normalizeCase ::
    tokenize ::
    TANonEmptyList.one(termFrequency)

As you can see, we use TANonEmptyList.one to wrap the last element in a type-aligned list and use :: to prepend elements to it. This is analogous to using 1 :: 2 :: NonEmptyList.one(3) to construct a Cats NonEmptyList

We have a data pipeline! Read on to find out what we can do with it.

operations on type-aligned lists

Type-aligned lists have a number of operations that mirror standard list operations (such as map, foldLeft, etc), but the type-aligned versions are generally a bit more complex.

toList

Sometimes it can be handy to avoid complexity by throwing away type information and converting a type-aligned list into a standard List that has unknown input and output types. For example, since each stage has a name field that is a String regardless of the input and output types of the stage, we can write a method to pretty-print the stage names using toList.

def prettyPrintStageNames[A, B](pipeline: Pipeline[A, B]): String =
  pipeline.toList.map(_.name).mkString_("->")

val bagOfWordsStageNames = prettyPrintStageNames(bagOfWordsPipeline)
assert(bagOfWordsStageNames eqv "normalize-case->tokenize->term-frequency")

mapK

The map method on a traditional List[A] takes a function A => B and transforms the list into a List[B] by running the function on each element in the list. What would it mean for a type-aligned list to have a map method? Consider a type-aligned list with three elements, F[A => B], F[B => C], F[C => D]. How could we possibly pass a single function in that could act upon every element in the list? A traditional function won’t work for this. Since A, B, C, and D could all be different types, we need to ensure that the “function” that we pass in can handle F[X, Y] for all possible X and Y types. The input to the function could look like def apply[X, Y](f: F[X, Y]) to force it to work for all X and Y types, but what would the return type be? If we want to maintain the type-aligned nature of the sequence then the function can’t change the X and Y type parameters, but it can transform F to another type: def apply[X, Y](f: F[X, Y]): G[X, Y]. This signature is provided by FunctionK2 which is similar to the FunctionK type in Cats, but FunctionK works on types of the shape F[_] (like Option) while FunctionK2 works on types of the shape F[_, _] (like Either).

We can define a FunctionK2 that transforms each Stage into a Kleisli, and then we can then pass the FunctionK instance into mapK.

val stageToKleisli: FunctionK2[Stage, Kleisli[IO, *, *]] =
  new FunctionK2[Stage, Kleisli[IO, *, *]] {
    def apply[A, B](stage: Stage[A, B]): Kleisli[IO, A, B] = Kleisli(stage.apply(_))
  }

val kleisliList: TAList[Kleisli[IO, *, *], String, Map[Token, Int]] =
  bagOfWordsPipeline.mapK(stageToKleisli)

composeAll

At this point it might not be clear why converting from a Stage to a Kleisli might be useful. One nice property of the Kleisli type is that it forms a Category, so a Kleisli[IO, A, B] can be composed with a Kleisli[IO, B, C] to form a Kleisli[IO, A, C] that feeds the output of the first Kleisli as input into the second Kleisli and returns the result of the second Kleisli.

TAList[F, A, B] has a composeAll method that will use the Category instance for F to compose all of the elements of the list into a single F[A, B]. In this case it can compose our Kleisli pipeline into a single Kleisli

val toBagOfWords: Kleisli[IO, String, Map[Token, Int]] = kleisliList.composeAll

val output: Map[Token, Int] = toBagOfWords("I think therefore I am").unsafeRunSync
assert(output eqv Map("i" -> 2, "think" -> 1, "therefore" -> 1, "am" -> 1))

making the pipeline more useful

So far we haven’t gotten much of a benefit from using a type-aligned list of Stage elements for our pipeline. We are able to pretty-print the steps of the pipeline which may be a bit useful but probably doesn’t justify the introduction of type-aligned sequences. The effort starts to pay off when we want to add common logic to every stage in a pipeline. For example, when we convert a Stage to a Kleisli, we could make that Kleisli automatically log the name of the stage and the amount of time that it took to run for each input.

First we’ll define a general method for measuring the amount of time that an IO operation takes to run.

def timeIO[A](action: IO[A], reportTime: (Duration, Option[Throwable]) => IO[Unit]): IO[A] =
  for {
    start <- IO(System.nanoTime)
    result <- action.attempt
    end <- IO(System.nanoTime)
    _ <- reportTime(Duration.ofNanos(end - start), result.left.toOption)
    a <- IO.fromEither(result)
  } yield a

Now we can define a new Stage to Kleisli transformation that performs the timing and logging.

val loggedStage = new FunctionK2[Stage, Kleisli[IO, *, *]] {

  def apply[A, B](stage: Stage[A, B]): Kleisli[IO, A, B] = {
    def msg(dur: Duration, err: Option[Throwable]): String =
      s"${stage.name} ${err.fold("succeeded")(_ => "failed")} in ${dur.toNanos} nanoseconds"

    Kleisli { input =>
      timeIO(stage(input), (duration, err) => IO(println(msg(duration, err))))
    }
  }
}

Now that all of the pieces are in place, we can compose a single Kleisli that will run the entire pipeline, timing and logging each stage as it is executed.

val timedBagOfWords: Kleisli[IO, String, Map[Token, Int]] =
  bagOfWordsPipeline.mapK(loggedStage).composeAll

timedBagOfWords("I think therefore I am")
  .flatMap(counts => IO(println(counts.show)))
  .unsafeRunSync
// normalize-case succeeded in 8355 nanoseconds
// tokenize succeeded in 4018 nanoseconds
// term-frequency succeeded in 4405 nanoseconds
// Map(am -> 1, therefore -> 1, think -> 1, i -> 2)

By maintaining a type-aligned sequence of stages, we were able to make a single mapK call to apply this common logic to every stage. If we hadn’t used a type-aligned list, adding logging and timing to each step would have become significantly more repetitive:

val timedBagOfWordsNoTAList: Kleisli[IO, String, Map[Token, Int]] =
  loggedStage(normalizeCase) andThen
    loggedStage(tokenize) andThen
    loggedStage(termFrequency)

For a 3-stage pipeline this might not be too bad, but as the number of stages in the pipeline grows, so will the benefit of using a type-aligned sequence.

is Stage useful?

We ended up converting all of our Stage elements into Kleisli elements, so it’s reasonable to question whether we should have used Kleisli directly and omitted the Stage type entirely. We can’t really beat Kleisli if our goal is strictly to compose together effectful functions. However, function composition might not be the only task that we want to accomplish with our pipeline. The prettyPrintStageNames function that we wrote earlier depended on Stage being a type that we could introspect to grab the name; Kleisli[F, A, B] is an opaque function that we can’t get any information out of unless we have an A to pass into it.

We could also add more fields to Stage to make it even more powerful. For example, we could add input deserializers and output serializers to a Stage:

trait Stage2[A, B] {
  def name: String

  def apply(input: A): IO[B]

  def deserializeInput(input: Array[Byte]): Either[Exception, A]

  def serializeOutput(output: B): Either[Exception, Array[Byte]]
}

We could still convert a pipeline into a Kleisli[IO, A, B] that completely ignores the serializers like we did before. But we could also implement a more sophisticated runtime for the pipeline that measures how long individual stages tend to take and determines whether to run stages within the same process or serialize the data and add it to a queue to be processed in a distributed manner.

The source code for this page can be found here.