Skip to content

Instantly share code, notes, and snippets.

@regis-leray
Last active October 14, 2016 13:54
Show Gist options
  • Select an option

  • Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.

Select an option

Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.

Revisions

  1. regis-leray revised this gist Oct 14, 2016. 2 changed files with 45 additions and 42 deletions.
    42 changes: 19 additions & 23 deletions valve-akka-stream.scala → Valve.scala
    Original file line number Diff line number Diff line change
    @@ -15,43 +15,39 @@ class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMateriali
    (logic, logic.switch)
    }

    private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape){
    private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape) with InHandler with OutHandler{
    import shape._

    var bufferedElement = List.empty[A]

    var bufferedElement = Option.empty[A]

    val switch = new ValveSwitch {
    val callback = getAsyncCallback[A](push(out, _))

    override def open: Unit = {
    mode = ValveMode.Open
    println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

    bufferedElement.foreach(push(out, _))
    bufferedElement = List.empty
    bufferedElement.foreach(callback.invoke)
    bufferedElement = Option.empty
    }

    override def close: Unit = {
    mode = ValveMode.Closed
    }
    }

    setHandler(in, new InHandler {
    override def onPush(): Unit = {
    val element = grab(in) //acquires the element that has been received during an onPush
    println(s"${mode} on push called with $element")
    if (mode == ValveMode.Open) {
    push(out, element) //push directly the element on the out port
    } else {
    bufferedElement = bufferedElement :+ element
    }
    }
    })
    setHandlers(in,out, this)

    setHandler(out, new OutHandler {
    override def onPull(): Unit = {
    println("on pull called")
    pull(in) //request the next element on in port
    override def onPush(): Unit = {
    val element = grab(in) //acquires the element that has been received during an onPush
    if (mode == ValveMode.Open) {
    push(out, element) //push directly the element on the out port
    } else {
    bufferedElement = Some(element)
    }
    })
    }


    override def onPull(): Unit = pull(in) //request the next element on in port
    }
    }

    @@ -60,4 +56,4 @@ trait ValveMode
    object ValveMode {
    case object Open extends ValveMode
    case object Closed extends ValveMode
    }
    }
    45 changes: 26 additions & 19 deletions test-valve-akka-stream.scala → ValveSpec.scala
    Original file line number Diff line number Diff line change
    @@ -1,13 +1,14 @@

    import akka.actor.ActorSystem
    import akka.pattern.after
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Keep, Sink, Source}
    import akka.stream.testkit.scaladsl.TestSink
    import org.scalatest._
    import org.scalatest.Matchers._
    import org.scalatest.Matchers._
    import org.scalatest.concurrent.ScalaFutures

    import scala.concurrent.Await
    import scala.concurrent.Future
    import scala.concurrent.duration._

    class ValveSpec extends FlatSpec with ScalaFutures {
    @@ -23,26 +24,30 @@ class ValveSpec extends FlatSpec with ScalaFutures {
    .toMat(Sink.seq)(Keep.both)
    .run()

    Thread.sleep(1000)
    valve.open
    after(100 millis, system.scheduler) {
    Future.successful(valve.open)
    }

    whenReady(seq, timeout(5 seconds)){ sum => sum should contain(1, 2, 3,4) }
    whenReady(seq, timeout(200 millis)){ sum => sum should contain inOrder (1, 2, 3) }
    }

    "A closed valve" should "emit only 3 elements after it has been open" in {
    val (valve, probe) = Source(1 to 3)
    "A closed valve" should "emit only 5 elements after it has been open" in {
    val (valve, probe) = Source(1 to 5)
    .viaMat(new Valve(ValveMode.Closed))(Keep.right)
    .toMat(TestSink.probe[Int])(Keep.both)
    .run()

    probe.request(1)
    probe.expectNoMsg()
    probe.request(2)
    probe.expectNoMsg(100 millis)

    valve.open
    probe.expectNext(1)
    probe.expectNext shouldEqual 1
    probe.expectNext shouldEqual 2

    probe.request(2)
    probe.expectNext(2, 3)
    probe.request(3)
    probe.expectNext shouldEqual 3
    probe.expectNext shouldEqual 4
    probe.expectNext shouldEqual 5

    probe.expectComplete()
    }
    @@ -53,23 +58,25 @@ class ValveSpec extends FlatSpec with ScalaFutures {
    .toMat(TestSink.probe[Int])(Keep.both)
    .run()

    probe.request(1)
    probe.expectNextN(List(1))
    probe.request(2)
    probe.expectNext() shouldEqual 1
    probe.expectNext() shouldEqual 2

    valve.close

    probe.request(1)
    probe.expectNoMsg
    probe.expectNoMsg(100 millis)

    valve.open
    probe.expectNextN(List(2))
    probe.expectNext() shouldEqual 3

    probe.request(3)
    probe.expectNextN(List(3,4,5))
    probe.request(2)
    probe.expectNext() shouldEqual 4
    probe.expectNext() shouldEqual 5

    probe.expectComplete()
    }



    }
    }
  2. regis-leray revised this gist Oct 13, 2016. 1 changed file with 75 additions and 0 deletions.
    75 changes: 75 additions & 0 deletions test-valve-akka-stream.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,75 @@
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Keep, Sink, Source}
    import akka.stream.testkit.scaladsl.TestSink
    import org.scalatest._
    import org.scalatest.Matchers._
    import org.scalatest.Matchers._
    import org.scalatest.concurrent.ScalaFutures

    import scala.concurrent.Await
    import scala.concurrent.duration._

    class ValveSpec extends FlatSpec with ScalaFutures {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = materializer.executionContext

    "A closed valve" should "emit only " in {

    val (valve, seq) = Source(1 to 3)
    .viaMat(new Valve(ValveMode.Closed))(Keep.right)
    .toMat(Sink.seq)(Keep.both)
    .run()

    Thread.sleep(1000)
    valve.open

    whenReady(seq, timeout(5 seconds)){ sum => sum should contain(1, 2, 3,4) }
    }

    "A closed valve" should "emit only 3 elements after it has been open" in {
    val (valve, probe) = Source(1 to 3)
    .viaMat(new Valve(ValveMode.Closed))(Keep.right)
    .toMat(TestSink.probe[Int])(Keep.both)
    .run()

    probe.request(1)
    probe.expectNoMsg()

    valve.open
    probe.expectNext(1)

    probe.request(2)
    probe.expectNext(2, 3)

    probe.expectComplete()
    }

    it should "emit 5 elements after it has been open/close/open" in {
    val (valve, probe) = Source(1 to 5)
    .viaMat(new Valve())(Keep.right)
    .toMat(TestSink.probe[Int])(Keep.both)
    .run()

    probe.request(1)
    probe.expectNextN(List(1))

    valve.close

    probe.request(1)
    probe.expectNoMsg

    valve.open
    probe.expectNextN(List(2))

    probe.request(3)
    probe.expectNextN(List(3,4,5))

    probe.expectComplete()
    }



    }
  3. regis-leray renamed this gist Oct 13, 2016. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  4. regis-leray renamed this gist Oct 13, 2016. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  5. regis-leray created this gist Oct 13, 2016.
    63 changes: 63 additions & 0 deletions scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,63 @@
    import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler}
    import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

    trait ValveSwitch {
    def open: Unit
    def close: Unit
    }

    class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {

    override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))

    override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = {
    val logic = new ValveGraphStageLogic(shape, mode)
    (logic, logic.switch)
    }

    private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape){
    import shape._

    var bufferedElement = List.empty[A]

    val switch = new ValveSwitch {
    override def open: Unit = {
    mode = ValveMode.Open
    println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

    bufferedElement.foreach(push(out, _))
    bufferedElement = List.empty
    }

    override def close: Unit = {
    mode = ValveMode.Closed
    }
    }

    setHandler(in, new InHandler {
    override def onPush(): Unit = {
    val element = grab(in) //acquires the element that has been received during an onPush
    println(s"${mode} on push called with $element")
    if (mode == ValveMode.Open) {
    push(out, element) //push directly the element on the out port
    } else {
    bufferedElement = bufferedElement :+ element
    }
    }
    })

    setHandler(out, new OutHandler {
    override def onPull(): Unit = {
    println("on pull called")
    pull(in) //request the next element on in port
    }
    })
    }
    }

    trait ValveMode

    object ValveMode {
    case object Open extends ValveMode
    case object Closed extends ValveMode
    }