Skip to content

Instantly share code, notes, and snippets.

@PierreMage
Last active February 22, 2016 16:34
Show Gist options
  • Select an option

  • Save PierreMage/30a281aed21ea869f87c to your computer and use it in GitHub Desktop.

Select an option

Save PierreMage/30a281aed21ea869f87c to your computer and use it in GitHub Desktop.

Revisions

  1. PierreMage revised this gist Feb 22, 2016. 2 changed files with 15 additions and 7 deletions.
    17 changes: 11 additions & 6 deletions SessionsStatsJob.scala
    Original file line number Diff line number Diff line change
    @@ -22,7 +22,8 @@ object SessionsStatsJob {
    case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) {
    val duration: Int = end - start

    def add(event: Event): Session = copy(end = event.timestamp, numberOfActions = numberOfActions + 1)
    def add(event: Event): Session =
    copy(end = event.timestamp, numberOfActions = numberOfActions + 1)
    }

    case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) {
    @@ -33,15 +34,18 @@ object SessionsStatsJob {
    numberOfSessions = numberOfSessions + 1
    )

    def stats(): (Int, Int, Double, Double) =
    (userId, numberOfSessions, duration.toDouble / numberOfSessions, numberOfActions.toDouble / numberOfSessions)
    def stats(): (Int, Int, Double, Double) = {
    val averageDuration = duration.toDouble / numberOfSessions
    val averageNumberOfActions = numberOfActions.toDouble / numberOfSessions
    (userId, numberOfSessions, averageDuration, averageNumberOfActions)
    }
    }

    object SessionsSummary {
    def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] =
    sessions.groupBy { case Session(userId, _, _, _) => userId }
    .map { case (userId, ss) =>
    ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((sessionsSummary, session) => sessionsSummary.add(session))
    ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((summary, session) => summary.add(session))
    }
    }
    }
    @@ -57,7 +61,7 @@ object ExternalOperations {
    def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] =
    events.groupBy { case Event(_, userId) => userId }
    .foldLeft(List.empty[Session]) {
    case (head :: tail, event@Event(timestamp, _)) if (timestamp - head.end) < maxIdleTimeInMillis =>
    case (head :: tail, event@Event(ts, _)) if (ts - head.end) < maxIdleTimeInMillis =>
    head.add(event) :: tail
    case (sessions, Event(timestamp, userId)) =>
    Session(userId, timestamp, timestamp, 1) :: sessions
    @@ -74,5 +78,6 @@ object ExternalOperations {
    sessions.flatMap(SessionsSummary.fromSessions).map(_.stats())
    }

    implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]]) extends SummarizeSessions
    implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]])
    extends SummarizeSessions
    }
    5 changes: 4 additions & 1 deletion SessionsStatsJobSpec.scala
    Original file line number Diff line number Diff line change
    @@ -19,7 +19,10 @@ class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl {
    } When {
    events: TypedPipe[Event] => events.extractSessions(10)
    } Then {
    _.toSet shouldBe Set(List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), List(Session(2, 1027, 1029, 2)))
    _.toSet shouldBe Set(
    List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)),
    List(Session(2, 1027, 1029, 2))
    )
    }
    }

  2. PierreMage created this gist Feb 22, 2016.
    78 changes: 78 additions & 0 deletions SessionsStatsJob.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,78 @@
    class SessionsStatsJob(args: Args) extends Job(args) {

    import ExternalOperations._
    import SessionsStatsJob._

    val maxIdleTimeInMillis = args.getOrElse("maxIdleTimeInMillis", "100").toInt
    val input = args("input")
    val output = args("output")

    val events = TypedPipe.from(TypedCsv[(Int, Int, String, String)](input))
    .map { case (timestamp, userId, _, _) => Event(timestamp, userId) }

    events.extractSessions(maxIdleTimeInMillis)
    .summarizeSessions
    .write(TypedCsv[(Int, Int, Double, Double)](output))
    }

    object SessionsStatsJob {

    case class Event(timestamp: Int, userId: Int)

    case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) {
    val duration: Int = end - start

    def add(event: Event): Session = copy(end = event.timestamp, numberOfActions = numberOfActions + 1)
    }

    case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) {
    def add(session: Session): SessionsSummary =
    copy(
    duration = duration + session.duration,
    numberOfActions = numberOfActions + session.numberOfActions,
    numberOfSessions = numberOfSessions + 1
    )

    def stats(): (Int, Int, Double, Double) =
    (userId, numberOfSessions, duration.toDouble / numberOfSessions, numberOfActions.toDouble / numberOfSessions)
    }

    object SessionsSummary {
    def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] =
    sessions.groupBy { case Session(userId, _, _, _) => userId }
    .map { case (userId, ss) =>
    ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((sessionsSummary, session) => sessionsSummary.add(session))
    }
    }
    }

    object ExternalOperations {

    import SessionsStatsJob._

    trait ExtractSessions {

    def events: TypedPipe[Event]

    def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] =
    events.groupBy { case Event(_, userId) => userId }
    .foldLeft(List.empty[Session]) {
    case (head :: tail, event@Event(timestamp, _)) if (timestamp - head.end) < maxIdleTimeInMillis =>
    head.add(event) :: tail
    case (sessions, Event(timestamp, userId)) =>
    Session(userId, timestamp, timestamp, 1) :: sessions
    }.values
    }

    implicit class ExtractSessionsWrapper(val events: TypedPipe[Event]) extends ExtractSessions

    trait SummarizeSessions {

    def sessions: TypedPipe[List[Session]]

    def summarizeSessions: TypedPipe[(Int, Int, Double, Double)] =
    sessions.flatMap(SessionsSummary.fromSessions).map(_.stats())
    }

    implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]]) extends SummarizeSessions
    }
    69 changes: 69 additions & 0 deletions SessionsStatsJobSpec.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,69 @@
    class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl {

    import SessionsStatsJob._

    "ExternalOperations" should {
    import ExternalOperations._

    "extract sessions" in {
    Given {
    List(
    Event(1001, 1),
    Event(1010, 1),
    Event(1019, 1),
    Event(1027, 1),
    Event(1027, 2),
    Event(1029, 2),
    Event(1037, 1)
    )
    } When {
    events: TypedPipe[Event] => events.extractSessions(10)
    } Then {
    _.toSet shouldBe Set(List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), List(Session(2, 1027, 1029, 2)))
    }
    }

    "summarize sessions" in {
    Given {
    List(
    List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)),
    List(Session(2, 1027, 1029, 2))
    )
    } When {
    sessions: TypedPipe[List[Session]] => sessions.summarizeSessions
    } Then {
    _.toSet shouldBe Set((1, 2, 13.0, 2.5), (2, 1, 2, 2))
    }
    }
    }

    "SessionsStatsJob" should {
    "calculate average session duration and average number of actions per session" in {
    val events = Seq(
    (1000, 1, "get", "session1_event1"),
    (1010, 1, "click", "session1_event2"),
    (1020, 1, "click", "session1_event3"),
    (1030, 1, "put", "session1_event4"),
    (1100, 2, "get", "session1_event1"),
    (1110, 2, "click", "session1_event2"),
    (1160, 2, "put", "session1_event3"),
    (1200, 1, "get", "session2_event1"),
    (1210, 1, "click", "session2_event2"),
    (1260, 1, "put", "session2_event3")
    )
    val expectedAverages = Set(
    (1, 2, 45.0, 3.5),
    (2, 1, 60.0, 3.0)
    )
    JobTest[SessionsStatsJob]
    .arg("input", "events")
    .arg("output", "averages")
    .arg("maxIdleTimeInMillis", "100")
    .source(TypedCsv[(Int, Int, String, String)]("events"), events)
    .sink[(Int, Int, Double, Double)](TypedCsv[(Int, Int, Double, Double)]("averages")) {
    _.toSet shouldBe expectedAverages
    }
    .run.finish
    }
    }
    }