Skip to content

Instantly share code, notes, and snippets.

@9len
Created May 25, 2012 16:49
Show Gist options
  • Select an option

  • Save 9len/2789151 to your computer and use it in GitHub Desktop.

Select an option

Save 9len/2789151 to your computer and use it in GitHub Desktop.
package com.twitter.servo.repository
import com.twitter.conversions.time._
import com.twitter.servo.cache._
import com.twitter.logging.Logger
import com.twitter.util.{Duration, Future, Return, Time, Throw, Try}
import scala.collection.mutable
import scala.util.Random
/**
* A set of classes representing the various states for a cached result.
*/
sealed abstract class CachedResult[+K, +V] {
def key: K
}
object CachedResult {
import CachedResultAction._
import CachingKeyValueRepository._
/** Indicates the key was not in cache */
case class NotFound[K](key: K) extends CachedResult[K, Nothing]
/** Indicates there was an error fetching the key */
case class Failed[K](key: K, t: Throwable) extends CachedResult[K, Nothing]
/** Indicates the cached value could not be deserialized */
case class DeserializationFailed[K](key: K) extends CachedResult[K, Nothing]
/** Indicates the cached value could not be serialized */
case class SerializationFailed[K](key: K) extends CachedResult[K, Nothing]
/** Indicates that a NotFound tombstone was found in cached */
case class CachedNotFound[K](key: K, cachedAt: Time) extends CachedResult[K, Nothing]
/** Indicates that a Deleted tombstone was found in cached */
case class CachedDeleted[K](key: K, cachedAt: Time) extends CachedResult[K, Nothing]
/** Indicates that value was found in cached */
case class CachedFound[K, V](key: K, value: V, cachedAt: Time) extends CachedResult[K, V]
type Handler[K, V] = CachedResult[K, V] => CachedResultAction[V]
type PartialHandler[K, V] = CachedResult[K, V] => Option[CachedResultAction[V]]
type HandlerFactory[Q <: Seq[K], K, V] = Q => Handler[K, V]
val PartialHandler = new {
/**
* Sugar to produce a PartialHandler from a PartialFunction. Successive calls to
* isDefined MUST return the same result. Otherwise, take the syntax hit and wire
* up your own PartialHandler.
*/
def apply[K, V](
partial: PartialFunction[CachedResult[K, V], CachedResultAction[V]]
): PartialHandler[K, V] = partial.lift(_)
/**
* compose one PartialHandler with another to produce a new PartialHandler
*/
def compose[K, V](
thisHandler: PartialHandler[K, V],
thatHandler: PartialHandler[K, V]
): PartialHandler[K, V] = { (cachedResult) =>
thisHandler(cachedResult) orElse thatHandler(cachedResult)
}
/**
* terminate a PartialHandler to produce a new Handler
*/
def terminate[K, V](
partial: PartialHandler[K, V],
handler: Handler[K, V] = defaultHandler[K, V]
): Handler[K, V] = { (cachedResult) =>
partial(cachedResult).getOrElse(handler(cachedResult))
}
/**
* lift a PartialFunction to a PartialHandler and terminate
*/
def terminate[K, V](
partial: PartialFunction[CachedResult[K, V], CachedResultAction[V]],
handler: Handler[K, V] = defaultHandler[K, V]
): Handler[K, V] = terminate(apply(partial), handler)
}
def defaultHandlerFactory[Q <: Seq[K], K, V]: HandlerFactory[Q, K, V] = _ => defaultHandler[K, V]
/**
* This is the default Handler. Failures are treated as misses.
*/
def defaultHandler[K, V]: Handler[K, V] = { (cachedResult) =>
cachedResult match {
case NotFound(_) | Failed(_, _) => HandleAsMiss
case DeserializationFailed(_) | SerializationFailed(_) => HandleAsMiss
case CachedNotFound(_, _) | CachedDeleted(_, _) => HandleAsNotFound
case CachedFound(_, value, _) => HandleAsFound(value)
}
}
/**
* A ParitalHandler that bubbles memcache failures up instead of converting
* those failures to misses.
*/
def failuresAreFailures[K, V] = PartialHandler[K, V] {
case Failed(_, t) => HandleAsFailed(t)
}
/**
* soft-expires CachedFound and CachedNotFound based on a ttl.
*
* @param ttl
* values older than this will be considered expired, but still
* returned, and asynchronously refreshed in cache.
* @param expiry
* (optional) function to compute the expiry time
*/
def softTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedFound(_, value, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
SoftExpiration(HandleAsFound(value))
case CachedNotFound(_, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
SoftExpiration(HandleAsNotFound)
}
/**
* hard-expires CachedFound and CachedNotFound based on a ttl
*
* @param ttl
* values older than this will be considered a miss
* @param expiry
* (optional) function to compute the expiry time
*/
def hardTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedFound(_, value, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
HandleAsMiss
case CachedNotFound(_, cachedAt) if expiry(cachedAt, ttl) < Time.now =>
HandleAsMiss
}
/**
* hard-expires a CachedNotFound tombstone based on a ttl
*
* @param ttl
* values older than this will be considered expired
* @param expiry
* (optional) function to compute the expiry time
*/
def notFoundHardTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedNotFound(_, cachedAt) =>
if (expiry(cachedAt, ttl) < Time.now)
HandleAsMiss
else
HandleAsNotFound
}
/**
* hard-expires a CachedDeleted tombstone based on a ttl
*
* @param ttl
* values older than this will be considered expired
* @param expiry
* (optional) function to compute the expiry time
*/
def deletedHardTtlExpiration[K, V](
ttl: Duration,
expiry: Expiry = fixedExpiry
) = PartialHandler[K, V] {
case CachedDeleted(_, cachedAt) =>
if (expiry(cachedAt, ttl) < Time.now)
HandleAsMiss
else
HandleAsNotFound
}
/**
* if shouldReadThrough evaluates to false, CachedResults are returned exclusively
* from cache, with no read through to the underlying KeyValueRepository. Otherwise,
* the readThroughHandler is used.
*/
def gatedReadThrough[Q <: Seq[K], K, V](
shouldReadThrough: Q => Boolean,
readThroughHandler: Handler[K, V]
): HandlerFactory[Q, K, V] = { (query) =>
if (shouldReadThrough(query))
readThroughHandler
else
cacheOnly[K, V]
}
/**
* read only from cache, never fall back to underlying KeyValueRepository
*/
def cacheOnly[K, V]: Handler[K, V] = {
_ match {
case CachedFound(_, value, _) => HandleAsFound(value)
case _ => HandleAsNotFound
}
}
}
object CachingKeyValueRepository {
/**
* A function that takes a cachedAt time and ttl, and returns an expiry time. This function
* _must_ be deterministic with respect to the arguments provided, otherwise, you might get a
* MatchError when using this with softTtlExpiration.
*/
type Expiry = (Time, Duration) => Time
/**
* An Expiry function with an epsilon of zero.
*/
val fixedExpiry: Expiry = (cachedAt: Time, ttl: Duration) => cachedAt + ttl
/**
* A repeatable "random" expiry function that perturbs the ttl with a random value
* no greater than +/-(ttl * maxFactor).
*/
def randomExpiry(maxFactor: Float): Expiry = {
if (maxFactor == 0) {
fixedExpiry
} else {
(cachedAt: Time, ttl: Duration) => {
val factor = (2 * new Random(cachedAt.inMilliseconds).nextFloat - 1) * maxFactor
cachedAt + ttl + (factor * ttl).toLong.milliseconds
}
}
}
/**
* A function the implements a read-repair operation.
*/
type ReadRepairer[Q, V] = (Q, Seq[V]) => Future[Seq[Try[V]]]
/**
* Implements the ReadRepairer type, but just throws an exception.
*/
val noRepair = (_: Any, values: Seq[Any]) => throw new UnsupportedOperationException
}
import CachingKeyValueRepository._
/**
* Reads keyed values from a LockingCache, and reads through to an underlying
* KeyValueRepository for misses. supports a "soft ttl", beyond which values
* will be read through out-of-band to the originating request
*
* @param underlying
* the underlying KeyValueRepository
* @param cache
* the locking cache to read from
* @newQuery
* a function for converting a subset of the keys of the original query into a new
* query. this is used to construct the query passed to the underlying repository
* to fetch the cache misses.
* @param cachedResultHandler
* a function that specify policies about how to handle results from cache. you defined
* how to handle failures (as misses or failures),
* @param picker
* used to choose between the value in cache and the value read from the DB when
* storing values in the cache
* @param observer
* a CacheObserver for collecting cache statistics
* @param readRepairer
* a function that can repair cached items. this is only meaningful if the cachedResultHandler
* can potentially return a value of ReadRepair.
*/
class CachingKeyValueRepository[Q <: Seq[K], K, V](
underlying: KeyValueRepository[Q, K, V],
cache: LockingCache[K, Cached[V]],
newQuery: SubqueryBuilder[Q, K],
handlerFactory: CachedResult.HandlerFactory[Q, K, V] =
CachedResult.defaultHandlerFactory[Q, K, V],
picker: LockingCache.Picker[Cached[V]] = new PreferNewestCached[V],
observer: CacheObserver = NullCacheObserver,
readRepairer: ReadRepairer[Q, V] = noRepair)
extends KeyValueRepository[Q, K, V]
{
@deprecated("Use HandlerFactory instead of PartialFunction")
def this(
underlying: KeyValueRepository[Q, K, V],
cache: LockingCache[K, Cached[V]],
newQuery: SubqueryBuilder[Q, K],
partialHandler: PartialFunction[CachedResult[K, V], CachedResultAction[V]],
picker: LockingCache.Picker[Cached[V]] = new PreferNewestCached[V],
observer: CacheObserver = NullCacheObserver,
readRepairer: ReadRepairer[Q, V] = noRepair
) = this(
underlying,
cache,
newQuery,
_ => CachedResult.PartialHandler.terminate(partialHandler),
picker,
observer,
readRepairer
)
import CachedResult._
import CachedResultAction._
protected[this] val log = Logger.get(getClass.getSimpleName)
protected[this] val effectiveCacheStats = observer.scope("effective")
protected case class ProcessedCacheResult(
hits: Map[K, V],
misses: Seq[K],
failures: Map[K, Throwable],
tombstones: Set[K],
softExpirations: Seq[K],
repairs: Seq[(K, V)])
override def apply(keys: Q): Future[KeyValueResult[K, V]] = {
getFromCache(keys) flatMap { cacheResult =>
val ProcessedCacheResult(hits, misses, failures, tombstones, softExpirations, repairs) =
process(keys, cacheResult)
val (repairKeys, repairItems) = repairs.unzip
recordCacheStats(keys, misses.toSet, softExpirations.toSet)
// now read through all notFound
val futureFromUnderlying = readThrough(newQuery(misses, keys))
val futureFromRepair = readRepair(newQuery(repairKeys, keys), repairItems)
// async read-through for the expired results, ignore results
readThrough(newQuery(softExpirations, keys))
// merge all results together
for {
fromUnderlying <- futureFromUnderlying
fromRepair <- futureFromRepair
fromCache = KeyValueResult(hits, tombstones, failures)
} yield KeyValueResult.sum(Seq(fromCache, fromUnderlying, fromRepair))
}
}
protected[this] def getFromCache(keys: Seq[K]): Future[KeyValueResult[K, Cached[V]]] = {
val uniqueKeys = keys.distinct
cache.get(uniqueKeys) handle { case t =>
log.error(t, "exception caught in cache get")
// treat total cache failure as a fetch that returned all failures
KeyValueResult(failed = uniqueKeys map { _ -> t } toMap)
}
}
/**
* Buckets cache results according to the wishes of the CachedResultHandler
*/
protected[this] def process(
keys: Q,
cacheResult: KeyValueResult[K, Cached[V]]
): ProcessedCacheResult = {
val cachedResultHandler = handlerFactory(keys)
val hits = new mutable.HashMap[K, V]
val misses = new mutable.ListBuffer[K]
val failures = new mutable.HashMap[K, Throwable]
val tombstones = new mutable.ListBuffer[K]
val softExpiredKeys = new mutable.ListBuffer[K]
val repairs = new mutable.ListBuffer[(K, V)]
for (key <- keys) {
val cachedResult = cacheResult(key) match {
case Throw(t) => Failed(key, t)
case Return(None) => NotFound(key)
case Return(Some(cached)) => cached.status match {
case CachedValueStatus.Found => cached.value match {
case None => NotFound(key)
case Some(value) => CachedFound(key, value, cached.cachedAt)
}
case CachedValueStatus.NotFound => CachedNotFound(key, cached.cachedAt)
case CachedValueStatus.Deleted => CachedDeleted(key, cached.cachedAt)
case CachedValueStatus.SerializationFailed => SerializationFailed(key)
case CachedValueStatus.DeserializationFailed => DeserializationFailed(key)
case CachedValueStatus.Evicted => NotFound(key)
}
}
def processAction(action: CachedResultAction[V]) {
action match {
case HandleAsMiss => misses += key
case HandleAsFound(value) => hits(key) = value
case HandleAsNotFound => tombstones += key
case HandleAsFailed(t) => failures(key) = t
case ReadRepair(value) => repairs += (key -> value)
case SoftExpiration(subaction) =>
softExpiredKeys += key
processAction(subaction)
}
}
processAction(cachedResultHandler(cachedResult))
}
ProcessedCacheResult(
hits.toMap,
misses,
failures.toMap,
tombstones.toSet,
softExpiredKeys,
repairs)
}
protected[this] def recordCacheStats(keys: Seq[K], notFound: Set[K], expired: Set[K]) {
keys foreach { key =>
if (notFound.contains(key) || expired.contains(key))
effectiveCacheStats.miss(key.toString)
else
effectiveCacheStats.hit(key.toString)
if (notFound.contains(key))
observer.miss(key.toString)
else
observer.hit(key.toString)
}
}
/**
* read through to the underlying repository
*
* @param keys
* the keys to read
*/
def readThrough(keys: Q): Future[KeyValueResult[K, V]] = {
if (keys.isEmpty) {
KeyValueResult.emptyFuture
} else {
underlying(keys) onSuccess { result =>
writeToCache(keys, result)
}
}
}
/**
* Read-repairs the specified items, writing the updated results back to cache.
*/
def readRepair(keys: Q, items: Seq[V]): Future[KeyValueResult[K, V]] = {
if (keys.isEmpty) {
KeyValueResult.emptyFuture
} else {
KeyValueResult.fromSeqTry(keys) {
readRepairer(keys, items)
} onSuccess { result =>
writeToCache(keys, result)
}
}
}
/**
* Writes the contents of the given KeyValueResult to cache.
*/
def writeToCache(keys: Q, result: KeyValueResult[K, V]) {
lazy val cachedEmpty = {
val now = Time.now
Cached[V](None, CachedValueStatus.NotFound, now, Some(now))
}
keys foreach { key =>
// only cache Returns from the underlying repo, skip Throws
(result(key) match {
case Return(Some(value)) => Some(cachedEmpty.copy(
value = Some(value),
status = CachedValueStatus.Found))
case Return(None) => Some(cachedEmpty)
case Throw(_) => None
}) foreach { cached =>
cache.lockAndSet(key, LockingCache.PickingHandler(cached, picker)) onFailure { case t =>
log.error(t, "exception caught in lockAndSet")
}
}
}
}
}
@evnm
Copy link
Copy Markdown

evnm commented May 29, 2012

I'm into this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment