package com.dvMENTALmadness import akka.actor.ActorSystem import slick.driver.PostgresDriver.api._ import scala.concurrent.duration._ import scala.concurrent.Await import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.ExecutionContext.Implicits.global case class Record(id: Int, value: String) class Records(tag: Tag) extends Table[Record](tag, "my_stream") { def id = column[Int]("id") def value = column[String]("value") def * = (id, value) <> (Record.tupled, Record.unapply) } object Monitor { def main(args: Array[String]): Unit = { implicit val system = ActorSystem("Monitor") implicit val materializer = ActorMaterializer() val db = Database.forConfig("pg-postgres") val queryLimit = 5 try { val newRecStream = Source.unfoldAsync(0) { n => val q = TableQuery[Records].filter(row => row.id > n).take(queryLimit) db.run(q.result).map { recs => val lastId = if(recs.isEmpty) n else recs.last.id Some(lastId, recs) } } .throttle(1, 1.second, 1, ThrottleMode.shaping) .flatMapConcat { recs => Source.fromIterator(() => recs.iterator) } .runForeach { rec => println(s"${rec.id}, ${rec.value}") } Await.ready(newRecStream, Duration.Inf) } catch { case ex: Throwable => println(ex) } finally { system.shutdown db.close } } }