Skip to content

Instantly share code, notes, and snippets.

REGISTER '../udfs/jython/actions_taken.py' USING jython AS actions_taken;
REGISTER '../udfs/python/actions_taken.py' USING streaming_python AS actions_taken1;
raw = load '$OUTPUT_PATH/extract-actions-taken'
using PigStorage()
as (
user_id:chararray,
visitor_id:chararray,
client_id:chararray,
last_modified:chararray,
set mongo.input.query {"date":{"\$gt":{"\$date":$MAX_DATE}}}
set mongo.input.split.create_input_splits false
actions_taken =
LOAD '$BUFFER_METRICS_MONGO_URI.event.seamless.actions_taken'
USING com.mongodb.hadoop.pig.MongoLoader(
'user_id:chararray,
visitor_id:chararray,
client_id:chararray,
last_modified:chararray,
# You need to install scikit-learn:
# sudo pip install scikit-learn
#
# Dataset: Polarity dataset v2.0
# http://www.cs.cornell.edu/people/pabo/movie-review-data/
#
# Full discussion:
# https://marcobonzanini.wordpress.com/2015/01/19/sentiment-analysis-with-python-and-scikit-learn
  1. General Background and Overview
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
#data from http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time
#Ask for these fields
#"DAY_OF_WEEK" (IN UI DayOfWeek)
#"FL_DATE" (FlightDate)
#"CARRIER" (Carrier)
#"ORIGIN_CITY_MARKET_ID" (OriginCityMarketID)
#"ORIGIN" (Origin)
#"CRS_DEP_TIME" (CRSDepTime)
#"DEP_DELAY" (DepDelay)
#"ARR_DELAY" (ArrDelay)
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg._
import org.apache.spark.{SparkConf, SparkContext}
// To use the latest sparse SVD implementation, please build your spark-assembly after this
// change: https://github.com/apache/spark/pull/1378
// Input tsv with 3 fields: rowIndex(Long), columnIndex(Long), weight(Double), indices start with 0
// Assume the number of rows is larger than the number of columns, and the number of columns is
// smaller than Int.MaxValue
$ ./bin/spark-shell
14/04/18 15:23:49 INFO spark.HttpServer: Starting HTTP Server
14/04/18 15:23:49 INFO server.Server: jetty-7.x.y-SNAPSHOT
14/04/18 15:23:49 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49861
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 0.9.1
/_/
# data can be found at https://data.sfgov.org/api/views/tmnf-yvry/rows.csv?accessType=DOWNLOAD
# or https://data.sfgov.org/Public-Safety/SFPD-Incidents-Previous-Three-Months/tmnf-yvry
import time
import matplotlib.colors as colors
import matplotlib.cm as cmx
from matplotlib import pyplot as plt
from matplotlib.patches import Patch
import numpy as np
import pandas
SELECT orders.customerid,
orders.transactiondate,
orders.transactionamount,
cohorts.cohortdate
FROM orders
JOIN (SELECT customerid,
Min(transactiondate) AS cohortDate
FROM orders
GROUP BY customerid) AS cohorts
ON orders.customerid = cohorts.customerid;