Skip to content

Instantly share code, notes, and snippets.

@RobertoUa
Created March 14, 2022 10:22
Show Gist options
  • Select an option

  • Save RobertoUa/189d6ccd3aa82021ed9f22af145c1bca to your computer and use it in GitHub Desktop.

Select an option

Save RobertoUa/189d6ccd3aa82021ed9f22af145c1bca to your computer and use it in GitHub Desktop.
fs2 groupWeighedWithin
implicit class StreamOps[F[_], O](private val s: Stream[F, O]) extends AnyVal {
def groupWeighedWithin(timeout: FiniteDuration, limit: Long, weight: O => Long)(implicit
F: Temporal[F]
): Stream[F, Chunk[O]] =
s.pull
.timed { timedPull =>
def go(timedPull: Pull.Timed[F, O], buffer: Chunk[O], bufferSize: Long): Pull[F, O, Unit] = {
timedPull.timeout(timeout) >>
timedPull.uncons.flatMap {
case Some((Right(elems), next)) =>
val size = elems.map(weight).sumAll
val nextSize = bufferSize + size
if (nextSize >= limit)
Pull.output(buffer) >> go(next, elems, size)
else
go(next, buffer ++ elems, nextSize)
case Some((Left(_), next)) =>
Pull.output(buffer) >> go(next, Chunk.empty, 0)
case None =>
Pull.output(buffer) >> Pull.done
}
}
go(timedPull, Chunk.empty, 0)
}
.stream
.chunks
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment