Skip to content

Instantly share code, notes, and snippets.

@scullxbones
Created May 7, 2012 01:38
Show Gist options
  • Select an option

  • Save scullxbones/2625351 to your computer and use it in GitHub Desktop.

Select an option

Save scullxbones/2625351 to your computer and use it in GitHub Desktop.
OOM upon Redis client attempt to reconnect
package akka.actor.mailbox
import akka.event.Logging
import akka.util.Duration
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.actor.SupervisorStrategy.Restart
/**
* akka.actor.mailbox
* Date: 5/5/12
* Time: 2:30 PM
*/
object DurableSystemTest {
val config = """
DurableSystemTest {
akka {
loglevel = DEBUG
actor {
debug {
receive = on
autoreceive = on
lifecycle = on
}
}
}
}
akka {
log-config-on-start = on
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
loglevel = "DEBUG"
actor {
debug {
receive = on
autoreceive = on
lifecycle = on
}
mailbox {
beanstalk {
hostname = "192.168.1.84"
}
mongodb {
uri = "mongodb://192.168.1.84/akka.mailbox"
}
redis {
hostname = "192.168.1.84"
}
zookeeper {
server-addresses = "192.168.1.84:2181"
}
}
}
}
beanstalk-dispatcher {
mailbox-type = akka.actor.mailbox.BeanstalkBasedMailboxType
}
file-dispatcher {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
}
mongo-dispatcher {
mailbox-type = akka.actor.mailbox.MongoBasedMailboxType
}
redis-dispatcher {
mailbox-type = akka.actor.mailbox.RedisBasedMailboxType
}
zk-dispatcher {
mailbox-type = akka.actor.mailbox.ZooKeeperBasedMailboxType
}
"""
def main(args: Array[String]): Unit = {
val _config = ConfigFactory.parseString(config)
val system = ActorSystem("DurableSystemTest",_config)
val distributor = system.actorOf(Props[Distributor], name="distributor")
import akka.util.duration._
system.scheduler.schedule(Duration.Zero,10 seconds, distributor, "Test Message")
}
}
class SimpleActor extends Actor {
val log = Logging(context.system,this)
// Block long time
import akka.util.duration._
val millis : Long = 1.seconds.toMillis
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 30 seconds) {
case _ ⇒ Restart
}
def receive = {
case s:String => log.info(self.path+": dispatcherid="+context.dispatcher.id+" Received message "+s); Thread.sleep(millis)
case o => log.info(self.path+": dispatcherid="+context.dispatcher.id+" Received unknown message "+o.toString()); Thread.sleep(millis)
}
}
class Distributor extends Actor {
val mongoActor = context.actorOf(Props[SimpleActor].withDispatcher("mongo-dispatcher"), name="mongoMailbox")
val fileActor = context.actorOf(Props[SimpleActor].withDispatcher("file-dispatcher"), name="fileMailbox")
val beanstalkActor = context.actorOf(Props[SimpleActor].withDispatcher("beanstalk-dispatcher"), name="beanstalkMailbox")
val redisActor = context.actorOf(Props[SimpleActor].withDispatcher("redis-dispatcher"), name="redisMailbox")
val zkActor = context.actorOf(Props[SimpleActor].withDispatcher("zk-dispatcher"), name="zookeeperMailbox")
val actors = //localActor ::
//mongoActor ::
//fileActor ::
//beanstalkActor ::
redisActor ::
//zkActor ::
Nil
import akka.util.duration._
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 30 seconds) {
case _ ⇒ Restart
}
def receive = {
case m => actors.foreach(_.forward(m))
}
}
java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.ArrayBuilder$ofByte.mkArray(ArrayBuilder.scala:114)
at scala.collection.mutable.ArrayBuilder$ofByte.resize(ArrayBuilder.scala:120)
at scala.collection.mutable.ArrayBuilder$ofByte.ensureSize(ArrayBuilder.scala:132)
at scala.collection.mutable.ArrayBuilder$ofByte.$plus$eq(ArrayBuilder.scala:137)
at com.redis.IO$class.readLine(IO.scala:88)
at com.redis.RedisClient.readLine(RedisClient.scala:41)
at com.redis.Reply$class.receive(RedisProtocol.scala:101)
at com.redis.RedisClient.receive(RedisClient.scala:41)
at com.redis.R$class.asInt(RedisProtocol.scala:114)
at com.redis.RedisClient.asInt(RedisClient.scala:41)
at com.redis.ListOperations$$anonfun$rpush$1.apply(ListOperations.scala:15)
at com.redis.ListOperations$$anonfun$rpush$1.apply(ListOperations.scala:15)
at com.redis.Redis$class.send(RedisClient.scala:19)
at com.redis.RedisClient.send(RedisClient.scala:41)
at com.redis.ListOperations$class.rpush(ListOperations.scala:15)
at com.redis.RedisClient.rpush(RedisClient.scala:41)
at akka.actor.mailbox.RedisBasedMessageQueue$$anonfun$enqueue$1$$anonfun$apply$1$$anonfun$apply$2.apply(RedisBasedMailbox.scala:39)
at akka.actor.mailbox.RedisBasedMessageQueue$$anonfun$enqueue$1$$anonfun$apply$1$$anonfun$apply$2.apply(RedisBasedMailbox.scala:38)
at com.redis.RedisClientPool.withClient(Pool.scala:32)
at akka.actor.mailbox.RedisBasedMessageQueue$$anonfun$enqueue$1$$anonfun$apply$1.apply(RedisBasedMailbox.scala:38)
at akka.actor.mailbox.RedisBasedMessageQueue$$anonfun$enqueue$1$$anonfun$apply$1.apply(RedisBasedMailbox.scala:38)
at akka.actor.mailbox.RedisBasedMessageQueue.akka$actor$mailbox$RedisBasedMessageQueue$$withErrorHandling(RedisBasedMailbox.scala:75)
at akka.actor.mailbox.RedisBasedMessageQueue$$anonfun$enqueue$1.apply(RedisBasedMailbox.scala:37)
at akka.actor.mailbox.RedisBasedMessageQueue$$anonfun$enqueue$1.apply(RedisBasedMailbox.scala:37)
at akka.actor.mailbox.CircuitBreaker$CircuitBreakerState$class.exceedsDeadline(CircuitBreaker.scala:135)
at akka.actor.mailbox.CircuitBreaker$CircuitBreakerClosed$.exceedsDeadline(CircuitBreaker.scala:260)
at akka.actor.mailbox.CircuitBreaker$CircuitBreakerState$class.callThrough(CircuitBreaker.scala:177)
at akka.actor.mailbox.CircuitBreaker$CircuitBreakerClosed$.callThrough(CircuitBreaker.scala:260)
at akka.actor.mailbox.CircuitBreaker$CircuitBreakerClosed$.onCall(CircuitBreaker.scala:265)
at akka.actor.mailbox.CircuitBreaker.withCircuitBreaker(CircuitBreaker.scala:51)
at akka.actor.mailbox.DurableMessageQueue.withCircuitBreaker(DurableMailbox.scala:32)
at akka.actor.mailbox.RedisBasedMessageQueue.enqueue(RedisBasedMailbox.scala:36)
@scullxbones
Copy link
Author

Something with this logic is borked:

  def readLine: Array[Byte] = {
    if(!connected) connect
    var delimiter = crlf
    var found: List[Int] = Nil
    var build = new scala.collection.mutable.ArrayBuilder.ofByte
    while (delimiter != Nil) {
      val next = try {
        in.read
      } catch {case e => -1}
      if (next < 0) return null
      if (next == delimiter.head) {
        found ::= delimiter.head
        delimiter = delimiter.tail
      } else {
        if (found != Nil) {
          delimiter = crlf
          build ++= found.reverseMap(_.toByte)
          found = Nil
        }
        build += next.toByte
      }
    }
    build.result
  }

I took a heap dump on OOM and there's a byte[] of about 270mil negative ones. I'm thinking the next < 0 check in:

      val next = try {
        in.read
      } catch {case e => -1}
      if (next < 0) return null

Isn't working as expected for this case.

@scullxbones
Copy link
Author

Looks related to:

debasishg/scala-redis#10

Don't know what current state of this issue is though.

@scullxbones
Copy link
Author

Upgrading from 2.4.0 version of library to 2.4.2 seems to fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment