Skip to content

Instantly share code, notes, and snippets.

@ggemelos
Last active August 29, 2015 14:23
Show Gist options
  • Select an option

  • Save ggemelos/80087cf2cdf7d5e85948 to your computer and use it in GitHub Desktop.

Select an option

Save ggemelos/80087cf2cdf7d5e85948 to your computer and use it in GitHub Desktop.
# GUIDE
# https://spark.apache.org/docs/latest/sql-programming-guide.html#overview
###
## HOW TO START PYSPARK CONSOLE (copy and paste into terminal)
###
/opt/spark-1.3.1-bin-hadoop2.4/bin/pyspark --master yarn-client --num-executors 34 --spark.yarn.executor.memoryOverhead 2000 --spark.executor.memory 4g --spark.shuffle.spill true --spark.shuffle.memoryFraction .6 --spark.storage.memoryFraction .6 --spark.driver.memory 4g
###
###
## INITIALISE PYSPARK CONSOLE (copy and paste into console)
###
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
# (Skip this one) sqlContext.setConf("spark.sql.parquet.filterPushdown", "true")
###
# To leave console
exit()
###
# USEFUL FUNCTION (copy and paste into console)
###
parseRow = lambda row: '\t'.join((str(e) for e in row))
saveRDDAsTSV = lambda data, location: data.map(parseRow).saveAsTextFile(location)
def printRDD(data, maxRows = 1000):
numRows = data.count()
if numRows > maxRows:
print 'Number of rows in data (%d) greater than max number of rows to print (%d)' % (numRows, maxRows)
for row in data.collect():
print parseRow(row)
###
###
## EXAMPLES
###
## Look at impressions and clicks for campaign 3615
# Point to taps
taps = sqlContext.parquetFile("/com/tapad/parquet/event/tap/2015/05/03/12/")
taps.registerTempTable("taps")
# Show the data schema
taps.printSchema()
# Find count taps associated with campaign 3615, break out by action, and write to HDFS "/user/george.gemelos/sparkSQLTest"
# note: that the """ is a way to break long strings up into multiple lines
data = sqlContext.sql("""
SELECT
taps.action_id, count(*)
FROM taps
WHERE
taps.campaign_id ='3615'
GROUP BY taps.action_id
""").cache()
saveRDDAsTSV(data, "/user/george.gemelos/sparkSQLTest")
# Find out how many rows
data.count()
# Find count taps associated with campaign 3615, break out by platform and action, and write to console
data = sqlContext.sql("""
SELECT
taps.header.platform, taps.action_id, count(*)
FROM taps
WHERE
taps.campaign_id ='3615'
GROUP BY taps.action_id, taps.header.platform
""").cache()
printRDD(data)
## Find conversion events for campaign 3154
# (Property ID, action ID)
# 1525 D-ACQ-TAPAD-CONVERSION
# 1525 DC-Tapad-Conv-2015
# 1525 D-ACQ-TAPAD-RETARGETING
# Point to tracked
tracked = sqlContext.parquetFile("/user/george.gemelos/campaignLog3154/data/tracked-events/")
tracked.registerTempTable("tracked")
# Point to taps
taps = sqlContext.parquetFile("/user/george.gemelos/campaignLog3154/data/taps")
taps.registerTempTable("taps")
# Find counts
data = sqlContext.sql("""
SELECT tracked.action_id, count(*)
FROM tracked
WHERE
(tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION')
OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015')
OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING')
GROUP BY tracked.action_id
""").cache()
printRDD(data)
# Find conversions with and without impressions
data = sqlContext.sql("""
SELECT TE.aid, IMP.impDID is not null, count(*)
FROM
(
SELECT tracked.header.device.storage_key as did, tracked.action_id as aid
From tracked
WHERE (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION')
OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015')
OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING')
GROUP BY tracked.header.device.storage_key, tracked.action_id
) TE
LEFT OUTER JOIN
(
SELECT taps.header.device.storage_key as impDID
FROM taps
WHERE taps.campaign_id ='3154' AND taps.action_id = 'impression'
GROUP BY taps.header.device.storage_key
) IMP
ON TE.did = IMP.impDID
GROUP BY TE.aid, IMP.impDID is not null
""").cache()
printRDD(data)
# Find conversions with and without impressions with platform
data = sqlContext.sql("""
SELECT TE.aid, TE.plat, IMP.impDID is not null, count(*)
FROM
(
SELECT
tracked.header.device.storage_key as did, tracked.header.device.platform as plat, tracked.action_id as aid
FROM tracked
WHERE
(tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-CONVERSION')
OR (tracked.property_id = '1525' AND tracked.action_id = 'DC-Tapad-Conv-2015')
OR (tracked.property_id = '1525' AND tracked.action_id = 'D-ACQ-TAPAD-RETARGETING')
GROUP BY
tracked.header.device.storage_key, tracked.header.device.platform, tracked.action_id
) TE
LEFT OUTER JOIN
(
SELECT
taps.header.device.storage_key as impDID
FROM taps
WHERE
taps.campaign_id ='3154' AND taps.action_id = 'impression'
GROUP BY
taps.header.device.storage_key
) IMP
ON TE.did = IMP.impDID
GROUP BY TE.aid, TE.plat, IMP.impDID is not null
ORDER BY TE.aid, TE.plat
""").cache()
printRDD(data)
###
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment