Skip to content

Instantly share code, notes, and snippets.

@ggemelos
Created June 26, 2015 14:58
Show Gist options
  • Select an option

  • Save ggemelos/b0ae597a16fea4ded09a to your computer and use it in GitHub Desktop.

Select an option

Save ggemelos/b0ae597a16fea4ded09a to your computer and use it in GitHub Desktop.
#start a screen
screen -S spark
# Start Spark
/opt/spark-1.3.1-bin-hadoop2.4/bin/pyspark --master yarn-client --num-executors 34 --spark.yarn.executor.memoryOverhead 2000 --spark.executor.memory 4g --spark.shuffle.spill true --spark.shuffle.memoryFraction .6 --spark.storage.memoryFraction .6 --spark.driver.memory 4g
###
# INITIALISE PYSPARK CONSOLE (copy and paste into console)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
# Helper functions
parseRow = lambda row: '\t'.join((str(e) for e in row))
saveRDDAsTSV = lambda data, location: data.map(parseRow).saveAsTextFile(location)
def printRDD(data, maxRows = 1000):
numRows = data.count()
if numRows > maxRows:
print 'Number of rows in data (%d) greater than max number of rows to print (%d)' % (numRows, maxRows)
for row in data.collect():
print parseRow(row)
# Add data sources
tracked = sqlContext.parquetFile("/user/george.gemelos/campaignLog3154/data/tracked-events/")
tracked.registerTempTable("tracked")
#addthis = sqlContext.parquetFile("/com/addthis/parquet/event/addthis/2015/02", "/com/addthis/parquet/event/addthis/2015/03", "/com/addthis/parquet/event/addthis/2015/04")
addthis = sqlContext.parquetFile("/com/addthis/parquet/event/addthis/2015/03/01")
addthis.registerTempTable("addthis")
# Look at data
addthis.printSchema()
tracked.printSchema()
data = sqlContext.sql("""
SELECT
addthis.header.ip_address as atIP
FROM
addthis
GROUP BY
addthis.header.ip_address
LIMIT 50
""").cache()
data = sqlContext.sql("""
SELECT
tracked.header.ip_address as tIP, tracked.header.device.storage_key as tDID, count(*) as numSightings
FROM
tracked
WHERE
(tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION')
OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015')
OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING')
GROUP BY
tracked.header.ip_address, tracked.header.device.storage_key
LIMIT
50
""").cache()
data = sqlContext.sql("""
SELECT
AT.atIP IS NOT NULL, count(*)
FROM
(SELECT
tracked.header.ip_address as tIP, tracked.header.device.storage_key as tDID, count(*) as numSightings
FROM
tracked
WHERE
(tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION')
OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015')
OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING')
GROUP BY
tracked.header.ip_address, tracked.header.device.storage_key
) TE
LEFT OUTER JOIN
(SELECT
addthis.header.ip_address as atIP
FROM
addthis
GROUP BY
addthis.header.ip_address
) AT
ON
AT.atIP = TE.tIP
GROUP BY
AT.atIP IS NOT NULL
""").cache()
# Print results
printRDD(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment