Last active
October 14, 2016 13:54
-
-
Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.
Revisions
-
regis-leray revised this gist
Oct 14, 2016 . 2 changed files with 45 additions and 42 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,43 +15,39 @@ class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMateriali (logic, logic.switch) } private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape) with InHandler with OutHandler{ import shape._ var bufferedElement = Option.empty[A] val switch = new ValveSwitch { val callback = getAsyncCallback[A](push(out, _)) override def open: Unit = { mode = ValveMode.Open bufferedElement.foreach(callback.invoke) bufferedElement = Option.empty } override def close: Unit = { mode = ValveMode.Closed } } setHandlers(in,out, this) override def onPush(): Unit = { val element = grab(in) //acquires the element that has been received during an onPush if (mode == ValveMode.Open) { push(out, element) //push directly the element on the out port } else { bufferedElement = Some(element) } } override def onPull(): Unit = pull(in) //request the next element on in port } } @@ -60,4 +56,4 @@ trait ValveMode object ValveMode { case object Open extends ValveMode case object Closed extends ValveMode } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,13 +1,14 @@ import akka.actor.ActorSystem import akka.pattern.after import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.testkit.scaladsl.TestSink import org.scalatest._ import org.scalatest.Matchers._ import org.scalatest.concurrent.ScalaFutures import scala.concurrent.Future import scala.concurrent.duration._ class ValveSpec extends FlatSpec with ScalaFutures { @@ -23,26 +24,30 @@ class ValveSpec extends FlatSpec with ScalaFutures { .toMat(Sink.seq)(Keep.both) .run() after(100 millis, system.scheduler) { Future.successful(valve.open) } whenReady(seq, timeout(200 millis)){ sum => sum should contain inOrder (1, 2, 3) } } "A closed valve" should "emit only 5 elements after it has been open" in { val (valve, probe) = Source(1 to 5) .viaMat(new Valve(ValveMode.Closed))(Keep.right) .toMat(TestSink.probe[Int])(Keep.both) .run() probe.request(2) probe.expectNoMsg(100 millis) valve.open probe.expectNext shouldEqual 1 probe.expectNext shouldEqual 2 probe.request(3) probe.expectNext shouldEqual 3 probe.expectNext shouldEqual 4 probe.expectNext shouldEqual 5 probe.expectComplete() } @@ -53,23 +58,25 @@ class ValveSpec extends FlatSpec with ScalaFutures { .toMat(TestSink.probe[Int])(Keep.both) .run() probe.request(2) probe.expectNext() shouldEqual 1 probe.expectNext() shouldEqual 2 valve.close probe.request(1) probe.expectNoMsg(100 millis) valve.open probe.expectNext() shouldEqual 3 probe.request(2) probe.expectNext() shouldEqual 4 probe.expectNext() shouldEqual 5 probe.expectComplete() } } -
regis-leray revised this gist
Oct 13, 2016 . 1 changed file with 75 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,75 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.testkit.scaladsl.TestSink import org.scalatest._ import org.scalatest.Matchers._ import org.scalatest.Matchers._ import org.scalatest.concurrent.ScalaFutures import scala.concurrent.Await import scala.concurrent.duration._ class ValveSpec extends FlatSpec with ScalaFutures { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() implicit val executionContext = materializer.executionContext "A closed valve" should "emit only " in { val (valve, seq) = Source(1 to 3) .viaMat(new Valve(ValveMode.Closed))(Keep.right) .toMat(Sink.seq)(Keep.both) .run() Thread.sleep(1000) valve.open whenReady(seq, timeout(5 seconds)){ sum => sum should contain(1, 2, 3,4) } } "A closed valve" should "emit only 3 elements after it has been open" in { val (valve, probe) = Source(1 to 3) .viaMat(new Valve(ValveMode.Closed))(Keep.right) .toMat(TestSink.probe[Int])(Keep.both) .run() probe.request(1) probe.expectNoMsg() valve.open probe.expectNext(1) probe.request(2) probe.expectNext(2, 3) probe.expectComplete() } it should "emit 5 elements after it has been open/close/open" in { val (valve, probe) = Source(1 to 5) .viaMat(new Valve())(Keep.right) .toMat(TestSink.probe[Int])(Keep.both) .run() probe.request(1) probe.expectNextN(List(1)) valve.close probe.request(1) probe.expectNoMsg valve.open probe.expectNextN(List(2)) probe.request(3) probe.expectNextN(List(3,4,5)) probe.expectComplete() } } -
regis-leray renamed this gist
Oct 13, 2016 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
regis-leray renamed this gist
Oct 13, 2016 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
regis-leray created this gist
Oct 13, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,63 @@ import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler} import akka.stream.{Attributes, FlowShape, Inlet, Outlet} trait ValveSwitch { def open: Unit def close: Unit } class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] { override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out")) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = { val logic = new ValveGraphStageLogic(shape, mode) (logic, logic.switch) } private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape){ import shape._ var bufferedElement = List.empty[A] val switch = new ValveSwitch { override def open: Unit = { mode = ValveMode.Open println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}") bufferedElement.foreach(push(out, _)) bufferedElement = List.empty } override def close: Unit = { mode = ValveMode.Closed } } setHandler(in, new InHandler { override def onPush(): Unit = { val element = grab(in) //acquires the element that has been received during an onPush println(s"${mode} on push called with $element") if (mode == ValveMode.Open) { push(out, element) //push directly the element on the out port } else { bufferedElement = bufferedElement :+ element } } }) setHandler(out, new OutHandler { override def onPull(): Unit = { println("on pull called") pull(in) //request the next element on in port } }) } } trait ValveMode object ValveMode { case object Open extends ValveMode case object Closed extends ValveMode }