Skip to content

Instantly share code, notes, and snippets.

@ufosky
Last active July 25, 2017 13:01
Show Gist options
  • Select an option

  • Save ufosky/faa6b37b4eaaf8ab468f98663c314b8f to your computer and use it in GitHub Desktop.

Select an option

Save ufosky/faa6b37b4eaaf8ab468f98663c314b8f to your computer and use it in GitHub Desktop.
import akka.actor.ActorSystem
import akka.event.{LogSource, Logging}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.{Marshaller, ToEntityMarshaller}
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.{RejectionError, ValidationRejection}
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshal, Unmarshaller}
import akka.stream._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergePreferred, Sink, Source, Zip}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString
import play.api.libs.json._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Random, Success}
case class Answer(content: String, url: String)
case class Question(title: String, answers: Seq[Answer], url: String)
trait BitlyResponseData
case class UrlShortenResponseData(url: String,
hash: String,
global_hash: String,
long_url: String,
new_hash: Int) extends BitlyResponseData
case class BitlyResponse[T <: BitlyResponseData](status_code: Int, status_txt: String, data: Option[T])
trait JsonSupport extends PlayJsonSupport {
implicit val urlShortenResponseDataFormat = Json.format[UrlShortenResponseData]
implicit val urlShortenResponseFormat = Json.format[BitlyResponse[UrlShortenResponseData]]
}
trait CompleteBox[+A]
case class Element[+A](ele: A) extends CompleteBox[A]
case object Complete extends CompleteBox[Nothing]
class CompleteBoxStage[A] extends GraphStage[FlowShape[A, CompleteBox[A]]] {
val in = Inlet[A]("CompleteBox.in")
val out = Outlet[CompleteBox[A]]("CompleteBox.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
def onComplete(): Unit = {
push(out, Complete)
completeStage()
}
setHandler(in, new InHandler {
override def onUpstreamFinish(): Unit = onComplete()
override def onPush(): Unit = {
push(out, Element(grab(in)))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
trait LogSupport {
def sourceName: String = "demo"
implicit val logSource = new LogSource[LogSupport] {
override def genString(t: LogSupport): String = t.sourceName
// override def genString(t: LogSupport, system: ActorSystem): String = super.genString(t, system)
}
}
object Demo extends App with JsonSupport with LogSupport {
import scala.concurrent.ExecutionContext.Implicits.global
def urlShorten[T](originUrl: String, context: T)(implicit system: ActorSystem, materializer: Materializer) = {
val params = Map("access_token" -> "9398d66cef97b0fb8049e749595f2c8063262a12",
"longUrl" -> originUrl,
"format" -> "json")
val uri = Uri("https://api-ssl.bitly.com/v3/shorten").withQuery(Uri.Query(params))
val request = HttpRequest(method = HttpMethods.GET, uri = uri)
Http().singleRequest(request).map(_ -> context)
}
def processFlow(implicit system: ActorSystem, materializer: Materializer) = {
type AnswerTuple2 = (Answer, Option[String])
type QuestionTuple2 = (Question, Option[String])
type QuestionTuple3 = (Question, Option[String], Seq[AnswerTuple2])
import akka.stream.scaladsl.GraphDSL.Implicits._
val log = Logging(system, this)
GraphDSL.create() { implicit builder =>
val a = builder.add(Flow[Question].log("in").withAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel, Logging.InfoLevel)).mapAsync(5) { question =>
urlShorten(question.url, question)
})
val b = Flow[(HttpResponse, Question)].mapAsync(5) { r =>
val response = r._1
val question = r._2
log.info(response._3.toString)
Unmarshal(response.entity).to[BitlyResponse[UrlShortenResponseData]].map { r =>
r.data match {
case Some(data) => (question, Some(data.url))
case None => (question, None)
}
}
}
val c = Flow[QuestionTuple2].map { r =>
(r._1, r._2, Seq.empty[(Answer, Option[String])])
}
val completeBoxStage = builder.add(new CompleteBoxStage[QuestionTuple3])
val bcast1 = builder.add(Broadcast[CompleteBox[QuestionTuple3]](2))
val merge = builder.add(MergePreferred[CompleteBox[QuestionTuple3]](2, false))
val e = Flow[CompleteBox[QuestionTuple3]]
.throttle(1, 1 second, 2, ThrottleMode.Shaping)
.log("merge")
.withAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel, Logging.InfoLevel))
.mapAsync(5) { r =>
r match {
case Element(r) =>
val question = r._1
val shortUrl = r._2
val processed = r._3
if (question.answers.size == 0) {
Future {
Element(r)
}
} else {
val nextIndex = processed.size
val answer = question.answers(nextIndex)
urlShorten(answer.url, answer).flatMap { r =>
val response = r._1
val answer = r._2
log.info(response._3.toString)
Unmarshal(response.entity).to[BitlyResponse[UrlShortenResponseData]].map { r =>
r.data match {
case Some(data) => (answer, Some(data.url))
case None => (answer, None)
}
}
}
.map { r =>
Element(question, shortUrl, processed :+ r)
}
}
case Complete => Future {
Complete
}
}
}
val bcast2 = builder.add(Broadcast[CompleteBox[QuestionTuple3]](2))
val g = Flow[CompleteBox[QuestionTuple3]].filter { r =>
r match {
case Element(r) => r._3.size < r._1.answers.size
case Complete => true
}
}
.scan((Option.empty[CompleteBox[QuestionTuple3]], Option.empty[CompleteBox[QuestionTuple3]])) { (acc, r) =>
val pre = acc._1
val cur = acc._2
val next = r
pre match {
case Some(Complete) => (pre, Some(next))
case _ => (cur, Some(next))
}
}
.log("cycle")
.withAttributes(Attributes.logLevels(Logging.InfoLevel, Logging.InfoLevel, Logging.InfoLevel))
.takeWhile({ r =>
r match {
case (Some(Complete), Some(Element(r))) if r._1.answers.size == r._3.size + 1 => false
case (Some(Element(r)), Some(Complete)) if r._1.answers.size == r._3.size + 1 => false
case _ => true
}
}, true)
.filter { r =>
r match {
case (_, Some(Element(r))) => true
case _ => false
}
}
.map { r =>
r match {
case (_, Some(r)) => r
}
}
val h = Flow[CompleteBox[QuestionTuple3]].filter { r =>
r match {
case Element(r) => r._3.size == r._1.answers.size
case Complete => true
}
}
val bcast3 = builder.add(Broadcast[CompleteBox[QuestionTuple3]](2))
val zip = builder.add(Zip[CompleteBox[QuestionTuple3], CompleteBox[QuestionTuple3]]())
val k = Flow[(CompleteBox[QuestionTuple3], CompleteBox[QuestionTuple3])].filter {
_ match {
case (Complete, Complete) => true
case _ => false
}
}
.map(_._1)
.take(1)
val i = Flow[CompleteBox[QuestionTuple3]].filter { r =>
r match {
case Complete => false
case _ => true
}
}
.map { r =>
r match {
case Element(r) => r
}
}
val j = builder.add(Flow[QuestionTuple3].map { r =>
val question = r._1
val shortUrl = r._2.getOrElse(r._1.url)
val answers = r._3
val answerText = answers.zipWithIndex.map { r =>
s"A${r._2 + 1}: ${r._1._1.content} See more: ${r._1._2.getOrElse(r._1._1.url)}"
}
.mkString("\n")
s"Q: ${question.title} (${shortUrl})\n${answerText}"
})
// FORMAT: OFF
a ~> b ~> c ~> completeBoxStage ~> bcast1 ~> merge ~> e ~> bcast2 ~> h ~> bcast3 ~> i ~> j
merge.preferred <~ g <~ bcast2
bcast1 ~> zip.in0
zip.in1 <~ bcast3
merge <~ k <~ zip.out
// FORMAT: ON
FlowShape(a.in, j.out)
}
}
def shutDown(implicit system: ActorSystem, materializer: ActorMaterializer) = {
Http().shutdownAllConnectionPools().flatMap { _ =>
materializer.shutdown()
system.terminate()
}
}
def createMockData(questionCount: Int) = {
List.range(0, questionCount).map { i =>
val answerCount = Random.nextInt(10)
val answers = List.range(0, answerCount).map { j =>
Answer(s"Answer ${j + 1}", s"http://www.example.com/q${i + 1}/a${j + 1}")
}
Question(s"Question ${i + 1}", answers, s"http://www.example.com/q${i + 1}")
}
}
val questions = createMockData(10)
val source = Source(questions)
KillSwitches
val sink = Sink.foreach[String] { r =>
println(r)
println()
}
implicit val system = ActorSystem("demo")
implicit val materializer = ActorMaterializer.create(system)
val (queue, done) = source.throttle(1, 1 second, 2, ThrottleMode.Shaping).via(processFlow).toMat(sink)(Keep.both).run()
done.onComplete {
case Success(d) => {
println("Success")
shutDown
}
case Failure(e) => {
println(e)
shutDown
}
}
sys.addShutdownHook(system.terminate())
Await.result(system.whenTerminated, Duration.Inf)
}
object PlayJsonSupport extends PlayJsonSupport {
final case class PlayJsonError(error: JsError) extends RuntimeException {
override def getMessage: String =
JsError.toJson(error).toString()
}
}
/**
* Automatic to and from JSON marshalling/unmarshalling using an in-scope *play-json* protocol.
*/
trait PlayJsonSupport {
import MediaTypes._
import PlayJsonSupport._
def unmarshallerContentTypes: Seq[ContentTypeRange] =
List(`application/json`)
private val jsonStringUnmarshaller =
Unmarshaller.byteStringUnmarshaller
.forContentTypes(unmarshallerContentTypes: _*)
.mapWithCharset {
case (ByteString.empty, _) => throw Unmarshaller.NoContentException
case (data, charset) => data.decodeString(charset.nioCharset.name)
}
private val jsonStringMarshaller = Marshaller.stringMarshaller(`application/json`)
/**
* HTTP entity => `A`
*
* @tparam A type to decode
* @return unmarshaller for `A`
*/
implicit def unmarshaller[A: Reads]: FromEntityUnmarshaller[A] = {
def read(json: JsValue) =
implicitly[Reads[A]]
.reads(json)
.recoverTotal { e =>
throw RejectionError(
ValidationRejection(JsError.toJson(e).toString, Some(PlayJsonError(e)))
)
}
jsonStringUnmarshaller.map(data => read(Json.parse(data)))
}
/**
* `A` => HTTP entity
*
* @tparam A type to encode
* @return marshaller for any `A` value
*/
implicit def marshaller[A](
implicit writes: Writes[A],
printer: JsValue => String = Json.prettyPrint
): ToEntityMarshaller[A] =
jsonStringMarshaller.compose(printer).compose(writes.writes)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment