package com.rxcorp.cesespoke.filewatcher.obs import monix.reactive.{Consumer, Observable, Observer} import monix.eval.Task._ import monix.execution.Ack import monix.execution.Ack._ import monix.execution.Scheduler.Implicits.global import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Random object Example extends App { val maxNo = 5 val source = Observable.interval(100.millis).mapTask(_ => eval(Random.nextInt(maxNo + 1))) val grouped = source.groupBy(identity).take(maxNo.toLong).mapAsync(maxNo)(go => go.consumeWith(intConsumer(go.key))).completedL Await.result(grouped.runAsync, Duration.Inf) def intConsumer(key: Int): Consumer[Int, Unit] = Consumer.fromObserver(implicit scheduler => { new Observer[Int]{ var counter = 0 val label = s"$key Consumer." override def onNext(elem: Int): Future[Ack] = { println(s"$label Received $elem") counter += 1 if(counter == 5) Stop else Continue } override def onError(ex: Throwable): Unit = println(s"$label ERROR: $ex") override def onComplete(): Unit = { println(s"$label completed with $counter") } } }) }