package pl.ayeo import cats.effect.IO import cats.effect.Ref import cats.effect.std.Queue import cats.effect.unsafe.implicits.global import cats.syntax.all.* import fs2.{Pipe, Stream} import scala.concurrent.duration.* object QueueApp extends App { def i1 = Stream.repeatEval(IO.readLine.map(_.toIntOption)) def i2 = Stream.repeatEval(IO.sleep(1.second) >> IO(Some(1))) def actor2(adder: IO[Adder]): Pipe[IO, Option[Int], IO[Option[Int]]] = stream => stream.map(input => adder.map { a => input.map { integer => a.run(integer) }.sequence }.flatten ) def logger: Pipe[IO, IO[Option[Int]], Unit] = _.map(println) def a2(i: Option[Int]): Option[Int] = i.map(_ + 2) case class Adder(c: Ref[IO, Int]) { def run(i: Int): IO[Int] = c.modify(d => (i + d, i)) } val ioAdder: IO[Adder] = Ref[IO].of(0).map(Adder(_)) i1.merge(i2).through(actor2(ioAdder)).through(logger).compile.drain.unsafeRunSync() }