import java.util.concurrent.BlockingQueue import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit class MaxLeaseRetriesException(message:String = null, cause:Throwable = null) extends Exception(message, cause) class ConnectionPool[R](maxSize:Int, timeout:Int = 1000, unit: TimeUnit = TimeUnit.MILLISECONDS) { val q: BlockingQueue[R] = new LinkedBlockingQueue[R](maxSize) /** * Given an initialization block, fill the connection pool. */ def construct(block: ()=> R) { for (_ <- 0 to maxSize) { q.put(block()) } } /** * Lease a connection from the pool for the duration of a code block * Use the default timeout to try and acquire the lease. Retry `maxRetries` times * to acquire the lease. * * Throws a MaxLeaseRetriesException if the maxRetries is hit with no luck. */ def lease[T](block: R => T, maxRetries:Int=10): T = { var con:R = q.poll(timeout, unit) var tries:Int = 1 while (con == null && tries <= maxRetries) { con = q.poll(timeout, unit) tries+=1 } if (con == null) { throw new MaxLeaseRetriesException(s"Tried to acquire connection lease ${maxRetries} times with no success!") } val result = block(con) q.put(con) result } }