Skip to content

Instantly share code, notes, and snippets.

@taraxe
Created April 3, 2013 08:35
Show Gist options
  • Select an option

  • Save taraxe/5299487 to your computer and use it in GitHub Desktop.

Select an option

Save taraxe/5299487 to your computer and use it in GitHub Desktop.
Play2 Scala sampled message stream as EventSource
val (broadcast, channel) = play.api.libs.iteratee.Concurrent.broadcast[JsValue]
def folder(d:Duration):Iteratee[JsValue,(Option[Long], List[JsValue])] = Iteratee.fold2[JsValue,(Option[Long], List[JsValue])]((None:Option[Long], List[JsValue]())){
case ((t,els),e) => {
val now = new Date().getTime
val current = t.getOrElse(now)
val done = now - current >= d.toMillis
Future.successful(((Some(current), e :: els),done))
}
}
def average(d:Duration):Enumeratee[JsValue,JsValue] = Enumeratee.grouped(folder(d).map{ case (_,l) =>
val averageLatency = l.map(x => (x \ "latency").as[Float]).sum / l.size
def transformer = __.json.update((__ \ "latency").json.put(JsNumber(averageLatency)))
l.headOption
.flatMap( _.validate(transformer).asOpt)
.map(_ ++ Json.obj("time"-> new Date().getTime))
.getOrElse(JsNull)
})
def forUser(id:String) = Enumeratee.filter[JsValue](j => (j \ "uuid").as[String] == id)
def stream(id:String) = Action{
Ok.feed(broadcast &> forUser(id) &> average(1 second) &> play.api.libs.EventSource())
.as("text/event-stream")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment