Last active
February 22, 2016 16:34
-
-
Save PierreMage/30a281aed21ea869f87c to your computer and use it in GitHub Desktop.
Revisions
-
PierreMage revised this gist
Feb 22, 2016 . 2 changed files with 15 additions and 7 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,7 +22,8 @@ object SessionsStatsJob { case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) { val duration: Int = end - start def add(event: Event): Session = copy(end = event.timestamp, numberOfActions = numberOfActions + 1) } case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) { @@ -33,15 +34,18 @@ object SessionsStatsJob { numberOfSessions = numberOfSessions + 1 ) def stats(): (Int, Int, Double, Double) = { val averageDuration = duration.toDouble / numberOfSessions val averageNumberOfActions = numberOfActions.toDouble / numberOfSessions (userId, numberOfSessions, averageDuration, averageNumberOfActions) } } object SessionsSummary { def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] = sessions.groupBy { case Session(userId, _, _, _) => userId } .map { case (userId, ss) => ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((summary, session) => summary.add(session)) } } } @@ -57,7 +61,7 @@ object ExternalOperations { def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] = events.groupBy { case Event(_, userId) => userId } .foldLeft(List.empty[Session]) { case (head :: tail, event@Event(ts, _)) if (ts - head.end) < maxIdleTimeInMillis => head.add(event) :: tail case (sessions, Event(timestamp, userId)) => Session(userId, timestamp, timestamp, 1) :: sessions @@ -74,5 +78,6 @@ object ExternalOperations { sessions.flatMap(SessionsSummary.fromSessions).map(_.stats()) } implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]]) extends SummarizeSessions } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -19,7 +19,10 @@ class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl { } When { events: TypedPipe[Event] => events.extractSessions(10) } Then { _.toSet shouldBe Set( List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), List(Session(2, 1027, 1029, 2)) ) } } -
PierreMage created this gist
Feb 22, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,78 @@ class SessionsStatsJob(args: Args) extends Job(args) { import ExternalOperations._ import SessionsStatsJob._ val maxIdleTimeInMillis = args.getOrElse("maxIdleTimeInMillis", "100").toInt val input = args("input") val output = args("output") val events = TypedPipe.from(TypedCsv[(Int, Int, String, String)](input)) .map { case (timestamp, userId, _, _) => Event(timestamp, userId) } events.extractSessions(maxIdleTimeInMillis) .summarizeSessions .write(TypedCsv[(Int, Int, Double, Double)](output)) } object SessionsStatsJob { case class Event(timestamp: Int, userId: Int) case class Session(userId: Int, start: Int, end: Int, numberOfActions: Int) { val duration: Int = end - start def add(event: Event): Session = copy(end = event.timestamp, numberOfActions = numberOfActions + 1) } case class SessionsSummary(userId: Int, numberOfSessions: Int, duration: Int, numberOfActions: Int) { def add(session: Session): SessionsSummary = copy( duration = duration + session.duration, numberOfActions = numberOfActions + session.numberOfActions, numberOfSessions = numberOfSessions + 1 ) def stats(): (Int, Int, Double, Double) = (userId, numberOfSessions, duration.toDouble / numberOfSessions, numberOfActions.toDouble / numberOfSessions) } object SessionsSummary { def fromSessions(sessions: Traversable[Session]): Traversable[SessionsSummary] = sessions.groupBy { case Session(userId, _, _, _) => userId } .map { case (userId, ss) => ss.foldLeft(SessionsSummary(userId, 0, 0, 0))((sessionsSummary, session) => sessionsSummary.add(session)) } } } object ExternalOperations { import SessionsStatsJob._ trait ExtractSessions { def events: TypedPipe[Event] def extractSessions(maxIdleTimeInMillis: Int): TypedPipe[List[Session]] = events.groupBy { case Event(_, userId) => userId } .foldLeft(List.empty[Session]) { case (head :: tail, event@Event(timestamp, _)) if (timestamp - head.end) < maxIdleTimeInMillis => head.add(event) :: tail case (sessions, Event(timestamp, userId)) => Session(userId, timestamp, timestamp, 1) :: sessions }.values } implicit class ExtractSessionsWrapper(val events: TypedPipe[Event]) extends ExtractSessions trait SummarizeSessions { def sessions: TypedPipe[List[Session]] def summarizeSessions: TypedPipe[(Int, Int, Double, Double)] = sessions.flatMap(SessionsSummary.fromSessions).map(_.stats()) } implicit class SummarizeSessionsWrapper(val sessions: TypedPipe[List[Session]]) extends SummarizeSessions } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,69 @@ class SessionsStatsJobSpec extends WordSpec with Matchers with TBddDsl { import SessionsStatsJob._ "ExternalOperations" should { import ExternalOperations._ "extract sessions" in { Given { List( Event(1001, 1), Event(1010, 1), Event(1019, 1), Event(1027, 1), Event(1027, 2), Event(1029, 2), Event(1037, 1) ) } When { events: TypedPipe[Event] => events.extractSessions(10) } Then { _.toSet shouldBe Set(List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), List(Session(2, 1027, 1029, 2))) } } "summarize sessions" in { Given { List( List(Session(1, 1037, 1037, 1), Session(1, 1001, 1027, 4)), List(Session(2, 1027, 1029, 2)) ) } When { sessions: TypedPipe[List[Session]] => sessions.summarizeSessions } Then { _.toSet shouldBe Set((1, 2, 13.0, 2.5), (2, 1, 2, 2)) } } } "SessionsStatsJob" should { "calculate average session duration and average number of actions per session" in { val events = Seq( (1000, 1, "get", "session1_event1"), (1010, 1, "click", "session1_event2"), (1020, 1, "click", "session1_event3"), (1030, 1, "put", "session1_event4"), (1100, 2, "get", "session1_event1"), (1110, 2, "click", "session1_event2"), (1160, 2, "put", "session1_event3"), (1200, 1, "get", "session2_event1"), (1210, 1, "click", "session2_event2"), (1260, 1, "put", "session2_event3") ) val expectedAverages = Set( (1, 2, 45.0, 3.5), (2, 1, 60.0, 3.0) ) JobTest[SessionsStatsJob] .arg("input", "events") .arg("output", "averages") .arg("maxIdleTimeInMillis", "100") .source(TypedCsv[(Int, Int, String, String)]("events"), events) .sink[(Int, Int, Double, Double)](TypedCsv[(Int, Int, Double, Double)]("averages")) { _.toSet shouldBe expectedAverages } .run.finish } } }