Last active
August 29, 2015 14:23
-
-
Save ggemelos/80087cf2cdf7d5e85948 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # GUIDE | |
| # https://spark.apache.org/docs/latest/sql-programming-guide.html#overview | |
| ### | |
| ## HOW TO START PYSPARK CONSOLE (copy and paste into terminal) | |
| ### | |
| /opt/spark-1.3.1-bin-hadoop2.4/bin/pyspark --master yarn-client --num-executors 34 --spark.yarn.executor.memoryOverhead 2000 --spark.executor.memory 4g --spark.shuffle.spill true --spark.shuffle.memoryFraction .6 --spark.storage.memoryFraction .6 --spark.driver.memory 4g | |
| ### | |
| ### | |
| ## INITIALISE PYSPARK CONSOLE (copy and paste into console) | |
| ### | |
| from pyspark.sql import SQLContext | |
| sqlContext = SQLContext(sc) | |
| sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") | |
| # (Skip this one) sqlContext.setConf("spark.sql.parquet.filterPushdown", "true") | |
| ### | |
| # To leave console | |
| exit() | |
| ### | |
| # USEFUL FUNCTION (copy and paste into console) | |
| ### | |
| parseRow = lambda row: '\t'.join((str(e) for e in row)) | |
| saveRDDAsTSV = lambda data, location: data.map(parseRow).saveAsTextFile(location) | |
| def printRDD(data, maxRows = 1000): | |
| numRows = data.count() | |
| if numRows > maxRows: | |
| print 'Number of rows in data (%d) greater than max number of rows to print (%d)' % (numRows, maxRows) | |
| for row in data.collect(): | |
| print parseRow(row) | |
| ### | |
| ### | |
| ## EXAMPLES | |
| ### | |
| ## Look at impressions and clicks for campaign 3615 | |
| # Point to taps | |
| taps = sqlContext.parquetFile("/com/tapad/parquet/event/tap/2015/05/03/12/") | |
| taps.registerTempTable("taps") | |
| # Show the data schema | |
| taps.printSchema() | |
| # Find count taps associated with campaign 3615, break out by action, and write to HDFS "/user/george.gemelos/sparkSQLTest" | |
| # note: that the """ is a way to break long strings up into multiple lines | |
| data = sqlContext.sql(""" | |
| SELECT | |
| taps.action_id, count(*) | |
| FROM taps | |
| WHERE | |
| taps.campaign_id ='3615' | |
| GROUP BY taps.action_id | |
| """).cache() | |
| saveRDDAsTSV(data, "/user/george.gemelos/sparkSQLTest") | |
| # Find out how many rows | |
| data.count() | |
| # Find count taps associated with campaign 3615, break out by platform and action, and write to console | |
| data = sqlContext.sql(""" | |
| SELECT | |
| taps.header.platform, taps.action_id, count(*) | |
| FROM taps | |
| WHERE | |
| taps.campaign_id ='3615' | |
| GROUP BY taps.action_id, taps.header.platform | |
| """).cache() | |
| printRDD(data) | |
| ## Find conversion events for campaign 3154 | |
| # (Property ID, action ID) | |
| # 1525 D-ACQ-TAPAD-CONVERSION | |
| # 1525 DC-Tapad-Conv-2015 | |
| # 1525 D-ACQ-TAPAD-RETARGETING | |
| # Point to tracked | |
| tracked = sqlContext.parquetFile("/user/george.gemelos/campaignLog3154/data/tracked-events/") | |
| tracked.registerTempTable("tracked") | |
| # Point to taps | |
| taps = sqlContext.parquetFile("/user/george.gemelos/campaignLog3154/data/taps") | |
| taps.registerTempTable("taps") | |
| # Find counts | |
| data = sqlContext.sql(""" | |
| SELECT tracked.action_id, count(*) | |
| FROM tracked | |
| WHERE | |
| (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING') | |
| GROUP BY tracked.action_id | |
| """).cache() | |
| printRDD(data) | |
| # Find conversions with and without impressions | |
| data = sqlContext.sql(""" | |
| SELECT TE.aid, IMP.impDID is not null, count(*) | |
| FROM | |
| ( | |
| SELECT tracked.header.device.storage_key as did, tracked.action_id as aid | |
| From tracked | |
| WHERE (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING') | |
| GROUP BY tracked.header.device.storage_key, tracked.action_id | |
| ) TE | |
| LEFT OUTER JOIN | |
| ( | |
| SELECT taps.header.device.storage_key as impDID | |
| FROM taps | |
| WHERE taps.campaign_id ='3154' AND taps.action_id = 'impression' | |
| GROUP BY taps.header.device.storage_key | |
| ) IMP | |
| ON TE.did = IMP.impDID | |
| GROUP BY TE.aid, IMP.impDID is not null | |
| """).cache() | |
| printRDD(data) | |
| # Find conversions with and without impressions with platform | |
| data = sqlContext.sql(""" | |
| SELECT TE.aid, TE.plat, IMP.impDID is not null, count(*) | |
| FROM | |
| ( | |
| SELECT | |
| tracked.header.device.storage_key as did, tracked.header.device.platform as plat, tracked.action_id as aid | |
| FROM tracked | |
| WHERE | |
| (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING') | |
| GROUP BY | |
| tracked.header.device.storage_key, tracked.header.device.platform, tracked.action_id | |
| ) TE | |
| LEFT OUTER JOIN | |
| ( | |
| SELECT | |
| taps.header.device.storage_key as impDID | |
| FROM taps | |
| WHERE | |
| taps.campaign_id ='3154' AND taps.action_id = 'impression' | |
| GROUP BY | |
| taps.header.device.storage_key | |
| ) IMP | |
| ON TE.did = IMP.impDID | |
| GROUP BY TE.aid, TE.plat, IMP.impDID is not null | |
| ORDER BY TE.aid, TE.plat | |
| """).cache() | |
| printRDD(data) | |
| ### | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment