object Client extends App with CirceSupport { implicit val system = ActorSystem("client") import system.dispatcher // Create the 'greeter' actor val greeter = system.actorOf(Props[Greeter], "greeter") implicit val materializer = ActorMaterializer() val log = system.log val persistJson = Flow[Json] val start = ByteString.empty val sep = ByteString("\n") val end = ByteString.empty implicit val jsonStreamingSupport = EntityStreamingSupport.json() .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end)) val decoder = Decoder[Json] val jsonFlow = Flow[Json] val subscribeCommand: Json = Json.fromString("{\"command\" : \"subscribeAll\"}") val source: Source[Json, Promise[Option[Json]]] = Source(List(subscribeCommand)) .concatMat(Source.maybe[Json])(Keep.right) val sink = Sink.foreach(s => greeter ! s) val clientFlow: Flow[Nothing, Json, NotUsed] = Flow.fromSinkAndSource(sink, source) Http().singleWebSocketRequest(WebSocketRequest("ws://gtp2betastream.systest.williamhill.plc:8080/eventUpdates"), clientFlow) }