Skip to content

Instantly share code, notes, and snippets.

@prateek
Forked from johnynek/MapReduceTypeClasses.scala
Last active August 29, 2015 14:08
Show Gist options
  • Select an option

  • Save prateek/751390ff95d3de0f35c6 to your computer and use it in GitHub Desktop.

Select an option

Save prateek/751390ff95d3de0f35c6 to your computer and use it in GitHub Desktop.

Revisions

  1. @johnynek johnynek renamed this gist Oct 29, 2014. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. @johnynek johnynek created this gist Oct 29, 2014.
    98 changes: 98 additions & 0 deletions MapReduceTypeClasses
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,98 @@
    package mapreduce

    /**
    * This is an attempt to find a minimal set of type classes that describe the map-reduce programming model
    * (the underlying model of Google map/reduce, Hadoop, Spark and others)
    * The idea is to have:
    * 1) lawful types that fully constrain correctness
    * 2) a minimal set of laws (i.e. we can't remove any laws,
    * 3) able to fully express existing map/reduce in terms of these types
    *
    * This is just a draft
    */

    /**
    * law 0: filter(from(l))(p) == from(l.filter(p))
    * law 1: concat(from(l1), from(l1)) == from(l1 ++ l2)
    * law 2: concat(filter(f1)(p), filter(f2)(p)) == filter(concat(f1, f2))(p)
    */
    trait Filterable[F, T] {
    def from(iter: Iterable[T]): F
    def filter(f: F)(pred: T => Boolean): F
    def concat(a: F, b: F): F
    }

    /**
    * law 0: concatMap(filterable.from(it))(fn) == filterable.from(it.flatMap(fn))
    * law 1: concatMap(concatMap(m)(fn1))(fn2) == concatMap(m)(fn1(_).flatMap(fn2))
    */
    trait ConcatMappable[M[+_]] {
    def concatMap[T, U](m: M[T])(fn: T => Iterable[U]): M[U]
    // must also be filterable
    def filterable[T]: Filterable[M[T], T]
    }

    /**
    * law 0: mapGroup(filterable.from(kvs))(fn) ==
    * filterable.from(kvs.groupBy(_._1).mapValues { kvs =>
    * val k = kvs.head._1
    * fn(k, kvs.map(_._2))
    * }
    * law 1: mapGroup(mapGroup(r)(fn1))(fn2) = mapGroup(r) { (k, vs) => fn2(k, fn1(k, vs)) }
    */
    trait KeyedReducer[R[_, +_]] {
    def mapGroup[K, V, W](r: R[K, V])(fn: (K, Iterable[V]) => Iterable[W]): R[K, W]
    def filterable[K, V]: Filterable[R[K, V], (K, V)]
    }

    trait Grouper[M[_], R[_, _]] {
    def group[K: Ordering, V](m: M[(K, V)]): R[K, V]
    }

    /** Usual monad laws */
    trait Monad[M[+_]] {
    def apply[T](t: T): M[T]
    def flatMap[T, U](m: M[T])(fn: T => M[U]): M[U]
    }

    /**
    * law 0: write(m, name).flatMap(read(_, name)) == monad.apply(m)
    * law 1: write(group(m), name).flatMap(read(_, name)) == monad.apply(m)
    */
    trait Executor[X[+_], S[_], N] {
    def monad: Monad[X]
    def read[M[+_]: ConcatMappable, T](input: S[T], name: N): X[M[T]]
    def write[M[+_]: ConcatMappable, T](m: M[T], name: N): X[S[T]]
    def write[R[_, +_]: KeyedReducer, K, V](r: R[K, V], name: N): X[S[(K, V)]]
    }

    trait Engine {
    // Type for the mapper operation
    type M[+_]
    // Type for the reducer operation
    type R[_, +_]
    // Type for the monad the executes reads and writes
    type X[+_]
    // The source/sink type
    type S[_]
    // the name type for sources and sinks
    type N

    def concatMappable: ConcatMappable[M]
    def keyedReducer: KeyedReducer[R]
    def grouper: Grouper[M, R]
    def executor: Executor[X, S, N]
    }

    /**
    * Example in scalding
    *

    trait ScaldingEngine extends Engine {
    type M[T] = TypedPipe[T]
    type R[K, V] = KeyedList[K, V]
    type X[T] = Execution[T]
    type S[T] = Source[T] with Sink[T]
    type N = Path
    }
    */