Created
June 26, 2015 14:58
-
-
Save ggemelos/b0ae597a16fea4ded09a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #start a screen | |
| screen -S spark | |
| # Start Spark | |
| /opt/spark-1.3.1-bin-hadoop2.4/bin/pyspark --master yarn-client --num-executors 34 --spark.yarn.executor.memoryOverhead 2000 --spark.executor.memory 4g --spark.shuffle.spill true --spark.shuffle.memoryFraction .6 --spark.storage.memoryFraction .6 --spark.driver.memory 4g | |
| ### | |
| # INITIALISE PYSPARK CONSOLE (copy and paste into console) | |
| from pyspark.sql import SQLContext | |
| sqlContext = SQLContext(sc) | |
| sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") | |
| # Helper functions | |
| parseRow = lambda row: '\t'.join((str(e) for e in row)) | |
| saveRDDAsTSV = lambda data, location: data.map(parseRow).saveAsTextFile(location) | |
| def printRDD(data, maxRows = 1000): | |
| numRows = data.count() | |
| if numRows > maxRows: | |
| print 'Number of rows in data (%d) greater than max number of rows to print (%d)' % (numRows, maxRows) | |
| for row in data.collect(): | |
| print parseRow(row) | |
| # Add data sources | |
| tracked = sqlContext.parquetFile("/user/george.gemelos/campaignLog3154/data/tracked-events/") | |
| tracked.registerTempTable("tracked") | |
| #addthis = sqlContext.parquetFile("/com/addthis/parquet/event/addthis/2015/02", "/com/addthis/parquet/event/addthis/2015/03", "/com/addthis/parquet/event/addthis/2015/04") | |
| addthis = sqlContext.parquetFile("/com/addthis/parquet/event/addthis/2015/03/01") | |
| addthis.registerTempTable("addthis") | |
| # Look at data | |
| addthis.printSchema() | |
| tracked.printSchema() | |
| data = sqlContext.sql(""" | |
| SELECT | |
| addthis.header.ip_address as atIP | |
| FROM | |
| addthis | |
| GROUP BY | |
| addthis.header.ip_address | |
| LIMIT 50 | |
| """).cache() | |
| data = sqlContext.sql(""" | |
| SELECT | |
| tracked.header.ip_address as tIP, tracked.header.device.storage_key as tDID, count(*) as numSightings | |
| FROM | |
| tracked | |
| WHERE | |
| (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING') | |
| GROUP BY | |
| tracked.header.ip_address, tracked.header.device.storage_key | |
| LIMIT | |
| 50 | |
| """).cache() | |
| data = sqlContext.sql(""" | |
| SELECT | |
| AT.atIP IS NOT NULL, count(*) | |
| FROM | |
| (SELECT | |
| tracked.header.ip_address as tIP, tracked.header.device.storage_key as tDID, count(*) as numSightings | |
| FROM | |
| tracked | |
| WHERE | |
| (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015') | |
| OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING') | |
| GROUP BY | |
| tracked.header.ip_address, tracked.header.device.storage_key | |
| ) TE | |
| LEFT OUTER JOIN | |
| (SELECT | |
| addthis.header.ip_address as atIP | |
| FROM | |
| addthis | |
| GROUP BY | |
| addthis.header.ip_address | |
| ) AT | |
| ON | |
| AT.atIP = TE.tIP | |
| GROUP BY | |
| AT.atIP IS NOT NULL | |
| """).cache() | |
| # Print results | |
| printRDD(data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment