Skip to content

Instantly share code, notes, and snippets.

@scullxbones
Created May 6, 2012 14:50
Show Gist options
  • Select an option

  • Save scullxbones/2622718 to your computer and use it in GitHub Desktop.

Select an option

Save scullxbones/2622718 to your computer and use it in GitHub Desktop.
System test for durable mailboxes + circuit breaker
package akka.actor.mailbox
import akka.event.Logging
import akka.util.Duration
import com.typesafe.config.ConfigFactory
import akka.actor.{ActorRef, Props, ActorSystem, Actor}
/**
* akka.actor.mailbox
* Date: 5/5/12
* Time: 2:30 PM
*/
object DurableSystemTest {
val config = """
DurableSystemTest {
akka {
loglevel = DEBUG
actor {
debug {
receive = on
autoreceive = on
lifecycle = on
}
}
}
}
akka {
loglevel = DEBUG
actor {
debug {
receive = on
autoreceive = on
lifecycle = on
}
mailbox {
beanstalk {
hostname = "192.168.1.84"
}
mongodb {
uri = "mongodb://192.168.1.84/akka.mailbox"
}
redis {
hostname = "192.168.1.84"
}
zookeeper {
server-addresses = "192.168.1.84:2181"
}
}
}
}
beanstalk-dispatcher {
mailbox-type = akka.actor.mailbox.BeanstalkBasedMailboxType
}
file-dispatcher {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
}
mongo-dispatcher {
mailbox-type = akka.actor.mailbox.MongoBasedMailboxType
}
redis-dispatcher {
mailbox-type = akka.actor.mailbox.RedisBasedMailboxType
}
zk-dispatcher {
mailbox-type = akka.actor.mailbox.ZooKeeperBasedMailboxType
}
"""
def main(args: Array[String]): Unit = {
val _config = ConfigFactory.parseString(config)
val system = ActorSystem("DurableSystemTest",_config)
val distributor = system.actorOf(Props[Distributor], name="distributor")
import akka.util.duration._
system.scheduler.schedule(Duration.Zero,10 seconds, distributor, "Test Message")
}
}
class SimpleActor extends Actor {
val log = Logging(context.system,this)
// Block long time
import akka.util.duration._
val millis : Long = 1.seconds.toMillis
def receive = {
case s:String => log.info(self.path+": dispatcherid="+context.dispatcher.id+" Received message "+s); Thread.sleep(millis)
case o => log.info(self.path+": dispatcherid="+context.dispatcher.id+" Received unknown message "+o.toString()); Thread.sleep(millis)
}
}
class Distributor extends Actor {
//val localActor = system.actorOf(Props[SimpleActor], name="localMailbox")
val mongoActor = context.actorOf(Props[SimpleActor].withDispatcher("mongo-dispatcher"), name="mongoMailbox")
//val fileActor = distributor.actorOf(Props[SimpleActor].withDispatcher("file-dispatcher"), name="fileMailbox")
val beanstalkActor = context.actorOf(Props[SimpleActor].withDispatcher("beanstalk-dispatcher"), name="beanstalkMailbox")
val redisActor = context.actorOf(Props[SimpleActor].withDispatcher("redis-dispatcher"), name="redisMailbox")
//val zkActor = system.actorOf(Props[SimpleActor].withDispatcher("zk-dispatcher"), name="zookeeperMailbox")
val actors = //localActor ::
//mongoActor ::
//fileActor ::
//beanstalkActor ::
redisActor ::
Nil
def receive = {
case m => actors.foreach(_.forward(m))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment