Building a generic element counter that transiently counts the elements that go through a fs2 Stream
During our migration from Akka to the Typelevel stack, we're switching from Akka Streams to fs2.
For one of our services, we needed something that counts the elements that go through a stream and writes a metric for it.
Previously that was done using an AtomicInteger, which worked alright but now we're in a pure functional land and writing metrics as well as using mutable variables is considered impure and has to be implemented as an IO action, which rules out AtomicInteger (one could use cats.effect.Ref as an almost-identical replacement, but we can do better than that).
Implementing this turned out to be a nice little exercise, so I thought it makes sense to write down how we did it.
So, suppose we have a stream of product data:
val products: Stream[IO, Product] = ???the obvious way to count the elements is:
val count: IO[Long] = products.compile.countCounting elements is not that hard, but in our case it's a bit more involved because we have to count the elements "in the middle of the stream" and can't move it to the end.
That means that we have to implement a Pipe that does the counting but doesn't swallow any of the elements:
val countItems: Pipe[IO, Product, Product] = ???To reinforce the fact that counting must not swallow elements (and also must not modify the elements in the stream), we replace the concrete type with a type parameter:
def countItems[A]: Pipe[IO, A, A] = ???The basic idea for the component is to branch off the stream to a substream that does the counting and reporting for us.
Such a fan-out can be done using broadcastThrough from fs2, so the skeleton of our counter looks like this:
def countItems[A](report: Long => IO[Unit]): Pipe[IO, A, A] = {
_.broadcastThrough(identity[Stream[IO, A]])
}This doesn't do anything yet, it pipes all the elements to a "no-op" pipe.
Since Pipe is just an alias for Stream[F, I] => Stream[F, O], we can leverage the identity function here.
No rocket science, but quite pleasing.
For counting the elements, we want to use .compile.count as described above, but that gives us an IO[Unit] instead of a Pipe[IO, A, A].
We need such a pipe though, otherwise we can't plug it into broadcastThrough.
We can fix that by lifting the IO action into Stream and then appending an empty stream:
val countPipe: Pipe[IO, A, Nothing] = (inputs: Stream[IO, A]) =>
Stream
.eval(inputs.compile.count.flatMap(report))
.flatMap(_ => Stream.empty)The output type of the pipe is Nothing which is a nice trick to guarantee that no elements will be emitted from the pipe.
Actually, there's no way for the pipe to come up with elements of type A, but using Nothing makes it even more obvious.
Now we're put the pieces together:
def countItems[A](report: Long => IO[Unit]): Pipe[IO, A, A] = {
val countPipe: Pipe[IO, A, Nothing] = (inputs: Stream[IO, A]) =>
Stream
.eval(inputs.compile.count.flatMap(report))
.flatMap(_ => Stream.empty)
_.broadcastThrough(countPipe, identity[Stream[IO, A]])
}And we're done!
If we want, we could go an extra mile and make the counter abstract over the effect type:
def countItems[F[_]: Concurrent, A](report: Long => F[Unit]): Pipe[F, A, A] = {
val countPipe: Pipe[F, A, Nothing] = (inputs: Stream[F, A]) =>
Stream
.eval(inputs.compile.count.flatMap(report))
.flatMap(_ => Stream.empty)
_.broadcastThrough(countPipe, identity[Stream[F, A]])
}but that's entirely optional and is probably more a matter of taste than anything else.
The most straightforward example would be:
val example1: IO[Unit] = for {
ref <- Ref.of[IO, Option[Long]](Option.empty[Long])
result <- Stream
.emits(List(1, 2, 3, 4, 5, 6))
.through(elementCounter.doCountItems(l => ref.set(Some(l))))
.compile
.toList
count <- ref.get
_ <- IO.println(result)
_ <- IO.println(count)
} yield ()which will print
List(1, 2, 3, 4, 5, 6)
Some(6)
So the elements after the pipe are the same as before, and the Ref contains the number of elements that went through the stream.
The second example is a tiny bit more complex and shows the advantage of the counter being a Pipe:
val example2: IO[Unit] = for {
ref <- Ref.of[IO, Option[Long]](Option.empty[Long])
result <- Stream
.emits(List(1, 2, 3, 4, 5, 6))
.through(elementCounter.doCountItems(l => ref.set(Some(l))))
.take(3)
.compile
.toList
count <- ref.get
_ <- IO.println(result)
_ <- IO.println(count)
} yield ()which will print
List(1, 2, 3)
Some(6)
Since we're filtering after the counter, it will report 6 elements, while the end result only contains the first three elements.
It's relatively easy to write a component that transiently counts the number of elements that go through a stream. Along the way we made use of a couple of nice tricks:
broadcastThroughis the perfect primitive in fs2 to implement a fan-out- a
Pipeis just a function, so we can use theidentityfunction to implement a pipe that doesn't do anything - a Stream with the output type
Nothingcan't ever emit any elements. It must be either empty or never ending. We can use that to guarantee that the counting pipe doesn't emit any elements - we end up with a
Pipethat can be plugged into any stream. It's self-contained in that the user does not need to take care of actually calling thereportaction after the stream has ended.
I posted this post in the fs2 channel of the Typelevel Discord and was made aware that my implementation can be simplified
def countItems[F[_]: Concurrent, A](report: Long => F[Unit]): Pipe[F, A, A] = {
val countPipe: Pipe[F, A, Nothing] = (inputs: Stream[F, A]) =>
Stream.exec(inputs.compile.count.flatMap(report))
_.broadcastThrough(countPipe, identity[Stream[F, A]])
}In addition to that, it's probably more performant to not use broadcastThrough but instead use evalTapChunk in combination with a Ref:
def counter[F[_]: Concurrent, A](report: Long => F[Unit]): Pipe[F, A, A] = inputs =>
for {
ref <- Stream.bracket(Ref.of[F, Long](0L))(_.get.flatMap(report))
a <- inputs.evalTapChunk(_ => ref.update(_ + 1))
} yield awhich has the same API as my initial version.
Furthermore, it's probably even more performant to count the elements based on the Chunks that go through the stream:
def counter[F[_]: Concurrent, A](report: Long => F[Unit]): Pipe[F, A, A] = inputs =>
for {
ref <- Stream.bracket(Ref.of[F, Long](0L))(_.get.flatMap(report))
a <- inputs.chunks.evalTap(c => ref.update(_ + c.size)).unchunks
} yield aI haven't done any benchmarks (yet) to verify that this is actually the fastest variant.