Last active
July 25, 2017 13:01
-
-
Save ufosky/faa6b37b4eaaf8ab468f98663c314b8f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import akka.actor.ActorSystem | |
| import akka.event.{LogSource, Logging} | |
| import akka.http.scaladsl.Http | |
| import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller} | |
| import akka.http.scaladsl.model._ | |
| import akka.http.scaladsl.server.{RejectionError, ValidationRejection} | |
| import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshal, Unmarshaller} | |
| import akka.stream._ | |
| import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergePreferred, Sink, Source, Zip} | |
| import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} | |
| import akka.util.ByteString | |
| import play.api.libs.json._ | |
| import scala.concurrent.duration._ | |
| import scala.concurrent.{Await, Future} | |
| import scala.util.{Failure, Random, Success} | |
| case class Answer(content: String, url: String) | |
| case class Question(title: String, answers: Seq[Answer], url: String) | |
| trait BitlyResponseData | |
| case class UrlShortenResponseData(url: String, | |
| hash: String, | |
| global_hash: String, | |
| long_url: String, | |
| new_hash: Int) extends BitlyResponseData | |
| case class BitlyResponse[T <: BitlyResponseData](status_code: Int, status_txt: String, data: Option[T]) | |
| trait JsonSupport extends PlayJsonSupport { | |
| implicit val urlShortenResponseDataFormat = Json.format[UrlShortenResponseData] | |
| implicit val urlShortenResponseFormat = Json.format[BitlyResponse[UrlShortenResponseData]] | |
| } | |
| trait CompleteBox[+A] | |
| case class Element[+A](ele: A) extends CompleteBox[A] | |
| case object Complete extends CompleteBox[Nothing] | |
| class CompleteBoxStage[A] extends GraphStage[FlowShape[A, CompleteBox[A]]] { | |
| val in = Inlet[A]("CompleteBox.in") | |
| val out = Outlet[CompleteBox[A]]("CompleteBox.out") | |
| override val shape = FlowShape.of(in, out) | |
| override def createLogic(attr: Attributes): GraphStageLogic = | |
| new GraphStageLogic(shape) { | |
| def onComplete(): Unit = { | |
| push(out, Complete) | |
| completeStage() | |
| } | |
| setHandler(in, new InHandler { | |
| override def onUpstreamFinish(): Unit = onComplete() | |
| override def onPush(): Unit = { | |
| push(out, Element(grab(in))) | |
| } | |
| }) | |
| setHandler(out, new OutHandler { | |
| override def onPull(): Unit = { | |
| pull(in) | |
| } | |
| }) | |
| } | |
| } | |
| trait LogSupport { | |
| def sourceName: String = "demo" | |
| implicit val logSource = new LogSource[LogSupport] { | |
| override def genString(t: LogSupport): String = t.sourceName | |
| // override def genString(t: LogSupport, system: ActorSystem): String = super.genString(t, system) | |
| } | |
| } | |
| object Demo extends App with JsonSupport with LogSupport { | |
| import scala.concurrent.ExecutionContext.Implicits.global | |
| def urlShorten[T](originUrl: String, context: T)(implicit system: ActorSystem, materializer: Materializer) = { | |
| val params = Map("access_token" -> "9398d66cef97b0fb8049e749595f2c8063262a12", | |
| "longUrl" -> originUrl, | |
| "format" -> "json") | |
| val uri = Uri("https://api-ssl.bitly.com/v3/shorten").withQuery(Uri.Query(params)) | |
| val request = HttpRequest(method = HttpMethods.GET, uri = uri) | |
| Http().singleRequest(request).map(_ -> context) | |
| } | |
| def processFlow(implicit system: ActorSystem, materializer: Materializer) = { | |
| type AnswerTuple2 = (Answer, Option[String]) | |
| type QuestionTuple2 = (Question, Option[String]) | |
| type QuestionTuple3 = (Question, Option[String], Seq[AnswerTuple2]) | |
| import akka.stream.scaladsl.GraphDSL.Implicits._ | |
| val log = Logging(system, this) | |
| GraphDSL.create() { implicit builder => | |
| val a = builder.add(Flow[Question].log("in").withAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel, Logging.InfoLevel)).mapAsync(5) { question => | |
| urlShorten(question.url, question) | |
| }) | |
| val b = Flow[(HttpResponse, Question)].mapAsync(5) { r => | |
| val response = r._1 | |
| val question = r._2 | |
| log.info(response._3.toString) | |
| Unmarshal(response.entity).to[BitlyResponse[UrlShortenResponseData]].map { r => | |
| r.data match { | |
| case Some(data) => (question, Some(data.url)) | |
| case None => (question, None) | |
| } | |
| } | |
| } | |
| val c = Flow[QuestionTuple2].map { r => | |
| (r._1, r._2, Seq.empty[(Answer, Option[String])]) | |
| } | |
| val completeBoxStage = builder.add(new CompleteBoxStage[QuestionTuple3]) | |
| val bcast1 = builder.add(Broadcast[CompleteBox[QuestionTuple3]](2)) | |
| val merge = builder.add(MergePreferred[CompleteBox[QuestionTuple3]](2, false)) | |
| val e = Flow[CompleteBox[QuestionTuple3]] | |
| .throttle(1, 1 second, 2, ThrottleMode.Shaping) | |
| .log("merge") | |
| .withAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel, Logging.InfoLevel)) | |
| .mapAsync(5) { r => | |
| r match { | |
| case Element(r) => | |
| val question = r._1 | |
| val shortUrl = r._2 | |
| val processed = r._3 | |
| if (question.answers.size == 0) { | |
| Future { | |
| Element(r) | |
| } | |
| } else { | |
| val nextIndex = processed.size | |
| val answer = question.answers(nextIndex) | |
| urlShorten(answer.url, answer).flatMap { r => | |
| val response = r._1 | |
| val answer = r._2 | |
| log.info(response._3.toString) | |
| Unmarshal(response.entity).to[BitlyResponse[UrlShortenResponseData]].map { r => | |
| r.data match { | |
| case Some(data) => (answer, Some(data.url)) | |
| case None => (answer, None) | |
| } | |
| } | |
| } | |
| .map { r => | |
| Element(question, shortUrl, processed :+ r) | |
| } | |
| } | |
| case Complete => Future { | |
| Complete | |
| } | |
| } | |
| } | |
| val bcast2 = builder.add(Broadcast[CompleteBox[QuestionTuple3]](2)) | |
| val g = Flow[CompleteBox[QuestionTuple3]].filter { r => | |
| r match { | |
| case Element(r) => r._3.size < r._1.answers.size | |
| case Complete => true | |
| } | |
| } | |
| .scan((Option.empty[CompleteBox[QuestionTuple3]], Option.empty[CompleteBox[QuestionTuple3]])) { (acc, r) => | |
| val pre = acc._1 | |
| val cur = acc._2 | |
| val next = r | |
| pre match { | |
| case Some(Complete) => (pre, Some(next)) | |
| case _ => (cur, Some(next)) | |
| } | |
| } | |
| .log("cycle") | |
| .withAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel, Logging.InfoLevel)) | |
| .takeWhile({ r => | |
| r match { | |
| case (Some(Complete), Some(Element(r))) if r._1.answers.size == r._3.size + 1 => false | |
| case (Some(Element(r)), Some(Complete)) if r._1.answers.size == r._3.size + 1 => false | |
| case _ => true | |
| } | |
| }, true) | |
| .filter { r => | |
| r match { | |
| case (_, Some(Element(r))) => true | |
| case _ => false | |
| } | |
| } | |
| .map { r => | |
| r match { | |
| case (_, Some(r)) => r | |
| } | |
| } | |
| val h = Flow[CompleteBox[QuestionTuple3]].filter { r => | |
| r match { | |
| case Element(r) => r._3.size == r._1.answers.size | |
| case Complete => true | |
| } | |
| } | |
| val bcast3 = builder.add(Broadcast[CompleteBox[QuestionTuple3]](2)) | |
| val zip = builder.add(Zip[CompleteBox[QuestionTuple3], CompleteBox[QuestionTuple3]]()) | |
| val k = Flow[(CompleteBox[QuestionTuple3], CompleteBox[QuestionTuple3])].filter { | |
| _ match { | |
| case (Complete, Complete) => true | |
| case _ => false | |
| } | |
| } | |
| .map(_._1) | |
| .take(1) | |
| val i = Flow[CompleteBox[QuestionTuple3]].filter { r => | |
| r match { | |
| case Complete => false | |
| case _ => true | |
| } | |
| } | |
| .map { r => | |
| r match { | |
| case Element(r) => r | |
| } | |
| } | |
| val j = builder.add(Flow[QuestionTuple3].map { r => | |
| val question = r._1 | |
| val shortUrl = r._2.getOrElse(r._1.url) | |
| val answers = r._3 | |
| val answerText = answers.zipWithIndex.map { r => | |
| s"A${r._2 + 1}: ${r._1._1.content} See more: ${r._1._2.getOrElse(r._1._1.url)}" | |
| } | |
| .mkString("\n") | |
| s"Q: ${question.title} (${shortUrl})\n${answerText}" | |
| }) | |
| // FORMAT: OFF | |
| a ~> b ~> c ~> completeBoxStage ~> bcast1 ~> merge ~> e ~> bcast2 ~> h ~> bcast3 ~> i ~> j | |
| merge.preferred <~ g <~ bcast2 | |
| bcast1 ~> zip.in0 | |
| zip.in1 <~ bcast3 | |
| merge <~ k <~ zip.out | |
| // FORMAT: ON | |
| FlowShape(a.in, j.out) | |
| } | |
| } | |
| def shutDown(implicit system: ActorSystem, materializer: ActorMaterializer) = { | |
| Http().shutdownAllConnectionPools().flatMap { _ => | |
| materializer.shutdown() | |
| system.terminate() | |
| } | |
| } | |
| def createMockData(questionCount: Int) = { | |
| List.range(0, questionCount).map { i => | |
| val answerCount = Random.nextInt(10) | |
| val answers = List.range(0, answerCount).map { j => | |
| Answer(s"Answer ${j + 1}", s"http://www.example.com/q${i + 1}/a${j + 1}") | |
| } | |
| Question(s"Question ${i + 1}", answers, s"http://www.example.com/q${i + 1}") | |
| } | |
| } | |
| val questions = createMockData(10) | |
| val source = Source(questions) | |
| KillSwitches | |
| val sink = Sink.foreach[String] { r => | |
| println(r) | |
| println() | |
| } | |
| implicit val system = ActorSystem("demo") | |
| implicit val materializer = ActorMaterializer.create(system) | |
| val (queue, done) = source.throttle(1, 1 second, 2, ThrottleMode.Shaping).via(processFlow).toMat(sink)(Keep.both).run() | |
| done.onComplete { | |
| case Success(d) => { | |
| println("Success") | |
| shutDown | |
| } | |
| case Failure(e) => { | |
| println(e) | |
| shutDown | |
| } | |
| } | |
| sys.addShutdownHook(system.terminate()) | |
| Await.result(system.whenTerminated, Duration.Inf) | |
| } | |
| object PlayJsonSupport extends PlayJsonSupport { | |
| final case class PlayJsonError(error: JsError) extends RuntimeException { | |
| override def getMessage: String = | |
| JsError.toJson(error).toString() | |
| } | |
| } | |
| /** | |
| * Automatic to and from JSON marshalling/unmarshalling using an in-scope *play-json* protocol. | |
| */ | |
| trait PlayJsonSupport { | |
| import MediaTypes._ | |
| import PlayJsonSupport._ | |
| def unmarshallerContentTypes: Seq[ContentTypeRange] = | |
| List(`application/json`) | |
| private val jsonStringUnmarshaller = | |
| Unmarshaller.byteStringUnmarshaller | |
| .forContentTypes(unmarshallerContentTypes: _*) | |
| .mapWithCharset { | |
| case (ByteString.empty, _) => throw Unmarshaller.NoContentException | |
| case (data, charset) => data.decodeString(charset.nioCharset.name) | |
| } | |
| private val jsonStringMarshaller = Marshaller.stringMarshaller(`application/json`) | |
| /** | |
| * HTTP entity => `A` | |
| * | |
| * @tparam A type to decode | |
| * @return unmarshaller for `A` | |
| */ | |
| implicit def unmarshaller[A: Reads]: FromEntityUnmarshaller[A] = { | |
| def read(json: JsValue) = | |
| implicitly[Reads[A]] | |
| .reads(json) | |
| .recoverTotal { e => | |
| throw RejectionError( | |
| ValidationRejection(JsError.toJson(e).toString, Some(PlayJsonError(e))) | |
| ) | |
| } | |
| jsonStringUnmarshaller.map(data => read(Json.parse(data))) | |
| } | |
| /** | |
| * `A` => HTTP entity | |
| * | |
| * @tparam A type to encode | |
| * @return marshaller for any `A` value | |
| */ | |
| implicit def marshaller[A]( | |
| implicit writes: Writes[A], | |
| printer: JsValue => String = Json.prettyPrint | |
| ): ToEntityMarshaller[A] = | |
| jsonStringMarshaller.compose(printer).compose(writes.writes) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment