Skip to content

Instantly share code, notes, and snippets.

@regis-leray
Last active October 14, 2016 13:54
Show Gist options
  • Select an option

  • Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.

Select an option

Save regis-leray/013dfe030159bcd890ca0d5cd440c938 to your computer and use it in GitHub Desktop.
Valve for Akka stream
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest._
import org.scalatest.Matchers._
import org.scalatest.Matchers._
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Await
import scala.concurrent.duration._
class ValveSpec extends FlatSpec with ScalaFutures {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = materializer.executionContext
"A closed valve" should "emit only " in {
val (valve, seq) = Source(1 to 3)
.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(Sink.seq)(Keep.both)
.run()
Thread.sleep(1000)
valve.open
whenReady(seq, timeout(5 seconds)){ sum => sum should contain(1, 2, 3,4) }
}
"A closed valve" should "emit only 3 elements after it has been open" in {
val (valve, probe) = Source(1 to 3)
.viaMat(new Valve(ValveMode.Closed))(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
probe.request(1)
probe.expectNoMsg()
valve.open
probe.expectNext(1)
probe.request(2)
probe.expectNext(2, 3)
probe.expectComplete()
}
it should "emit 5 elements after it has been open/close/open" in {
val (valve, probe) = Source(1 to 5)
.viaMat(new Valve())(Keep.right)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
probe.request(1)
probe.expectNextN(List(1))
valve.close
probe.request(1)
probe.expectNoMsg
valve.open
probe.expectNextN(List(2))
probe.request(3)
probe.expectNextN(List(3,4,5))
probe.expectComplete()
}
}
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
trait ValveSwitch {
def open: Unit
def close: Unit
}
class Valve[A](mode: ValveMode = ValveMode.Open) extends GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
}
private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends GraphStageLogic(shape){
import shape._
var bufferedElement = List.empty[A]
val switch = new ValveSwitch {
override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
bufferedElement.foreach(push(out, _))
bufferedElement = List.empty
}
override def close: Unit = {
mode = ValveMode.Closed
}
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val element = grab(in) //acquires the element that has been received during an onPush
println(s"${mode} on push called with $element")
if (mode == ValveMode.Open) {
push(out, element) //push directly the element on the out port
} else {
bufferedElement = bufferedElement :+ element
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
println("on pull called")
pull(in) //request the next element on in port
}
})
}
}
trait ValveMode
object ValveMode {
case object Open extends ValveMode
case object Closed extends ValveMode
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment