package akka.unfiltered import akka.actor._ import akka.dispatch.Future import akka.pattern.ask import akka.util.duration._ import akka.util.Timeout import unfiltered.Async import unfiltered.request._ import unfiltered.response._ import unfiltered.netty._ object AkkaUnfilteredSample extends App { val system = ActorSystem("sample") val accountActor = system.actorOf(Props[AccountActor]) implicit val timeout = Timeout(1 second) object Route extends async.Plan with ServerErrorResponse { def textResponse(content: String) = PlainTextContent ~> ResponseString(content + "\r\n") def actorResponse[A](body: => Future[ResponseFunction[A]])(implicit responder: Async.Responder[A]) { body onComplete { case Right(rf) => responder.respond(rf) case _ => // You should do something about the error here, but this is just a simple example ;) responder.respond(RequestTimeout) } } def intent = { case req => implicit val responder = req req match { case GET(Path("/ping")) => responder.respond(textResponse("Pong @ " + System.currentTimeMillis)) case GET(Path(Seg("account" :: "statement" :: accountId :: Nil))) => actorResponse { (accountActor ask Status(accountId)).mapTo[Int].map { r => if (r > 0) textResponse("Account total: " + r) else BadRequest ~> textResponse("Unknown account: " + accountId) } } case POST(Path("/account/deposit")) & Params(params) => bindParams(params) { (accountId, amount) => actorResponse { (accountActor ask Deposit(accountId, amount)).mapTo[Int].map { r => textResponse("Updated account total: " + r) } } } case POST(Path("/account/withdraw")) & Params(params) => bindParams(params) { (accountId, amount) => actorResponse { (accountActor ask Withdraw(accountId, amount)).mapTo[Int].map { r => if (r > 0) textResponse("Updated account total: " + r) else BadRequest ~> textResponse("Insufficient funds. Get your act together.") } } } case _ => Pass } } def bindParams[A](params: Params.Map)(success: (String, Int) => Unit)(implicit responder: Async.Responder[A]) { import QParams._ val expected = for { accountId <- lookup("accountId") is required("accountId is missing") is trimmed is nonempty("accountId is empty") amount <- lookup("amount") is required("amount is missing") is int(s => "'%s' is not an integer".format(s)) is pred(a => a >= 1, _ => "amount must be >= 1") } yield accountId.get -> amount.get expected(params) match { case Right((accountId, amount)) => success(accountId, amount) case Left(log) => val err = log.map(f => "%s %s".format(f.name, f.error)).mkString("", ", ", "\r\n") responder.respond(BadRequest ~> PlainTextContent ~> ResponseString(err)) } } } case class Status(accountId: String) case class Deposit(accountId: String, amount: Int) case class Withdraw(accountId: String, amount: Int) class AccountActor extends Actor { var accounts = Map.empty[String, Int] def receive = { case Status(accountId) => sender ! accounts.getOrElse(accountId, -1) case Deposit(accountId, amount) => sender ! deposit(accountId, amount) case Withdraw(accountId, amount) => sender ! withdraw(accountId, amount) } def deposit(accountId: String, amount: Int): Int = { accounts.get(accountId) match { case Some(value) => val newValue = value + amount accounts += accountId -> newValue newValue case _ => accounts += accountId -> amount amount } } def withdraw(accountId: String, amount: Int): Int = { accounts.get(accountId) match { case Some(value) => if (value < amount) -1 else { val newValue = value - amount accounts += accountId -> newValue newValue } case _ => -1 } } } val http = Http(9000).plan(Route).start() println("Server listening on port: 9000. Press any key to exit...") System.in.read() http.stop() system.shutdown() }