Skip to content

Instantly share code, notes, and snippets.

@yoongkang0122
Forked from vikas-gonti/Solutions
Created December 8, 2018 05:50
Show Gist options
  • Select an option

  • Save yoongkang0122/b8e2b98e84d4c14a4916fb59cb19772c to your computer and use it in GitHub Desktop.

Select an option

Save yoongkang0122/b8e2b98e84d4c14a4916fb59cb19772c to your computer and use it in GitHub Desktop.
Solution to 20 questions at http://nn02.itversity.com/cca175/
:'Problem 1
#Connect to the MySQL database on the itversity labs using sqoop and import all of the data from the orders table into HDFS
Output Requirements
#Place the customer files in the HDFS directory
#/user/yourusername/problem1/solution/
#Replace yourusername with your OS user name
#Use a text format with comma as the columnar delimiter
#Load every order record completely'
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--table orders \
--target-dir /user/vikasgonti/itversity/problem1/solution/
:'#Problem 2
#Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
Output Requirements
#Target Columns: customer_lname, customer_fname
#Number of Files: 1
#Place the output file in the HDFS directory
#/user/yourusername/problem2/solution/
#Replace yourusername with your OS user name
#File format should be text
#delimiter is (",")
#Compression: Uncompressed
'
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/retail_db \
--username retail_user \
--password itversity \
--target-dir /user/vikasgonti/itversity/problem2/solution/ \
-m 1 \
--query "select customer_lname, customer_fname from customers left outer join orders on customer_id = order_customer_id where \$CONDITIONS and order_customer_id is null order by customer_lname, customer_fname"
:'#Problem 3
#Get top 3 crime types based on number of incidents in RESIDENCE area using "Location Description"
Output Requirements
#Output Fields: crime_type, incident_count
#Output File Format: JSON
#Delimiter: N/A
#Compression: No
#Place the output file in the HDFS directory
#/user/yourusername/problem3/solution/
#Replace yourusername with your OS user name'
val crimeData = sc.textFile("/public/crime/csv")
val crimeDataHeader = crimeData.first
val crimeDatawithoutHeader = crimeData.filter(rec => rec!=crimeDataHeader)
crimeDatawithoutHeader.take(10).foreach(println)
val crimeRDD = crimeDatawithoutHeader.map(rec => {
val t = rec.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1)
(t(5),t(7))
})
val crimeDF = crimeRDD.toDF("type","location")
crimeDF.registerTempTable("crime");
val res = sqlContext.sql("select * from (select type as crime_type, count(1) as incident_count from crime "+
"where location = 'RESIDENCE' group by type order by incident_count desc) A limit 3");
res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem3/solution/")
:'#Problem 4
#Convert NYSE data into parquet
Output Requirements
#Column Names: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
#Convert file format to parquet
#Place the output file in the HDFS directory
#/user/yourusername/problem4/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/user/vikasgonti/data/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0),t(1),t(2),t(3),t(4),t(5),t(6))
}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
nyseDF.write.parquet("/user/vikasgonti/itversity/problem4/solution/")
:'#problem 5
#Get word count for the input data using space as delimite
Output Requirements
#Output File format: Avro
#Output fields: word, count
#Compression: Uncompressed
#Place the customer files in the HDFS directory
#/user/yourusername/problem5/solution/
#Replace yourusername with your OS user name'
spark-shell --master yarn \
--conf spark.ui.port=12456 \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 2 \
--packages com.databricks:spark-avro_2.10:2.0.1
val wordsRDD = sc.textFile("/public/randomtextwriter")
val wordsFlat = wordsRDD.flatMap(rec => rec.split(" "))
val wordsMap = wordsFlat.map(rec => (rec,1))
val wordsCount = wordsMap.reduceByKey((t,v) => t+v,8)
val wordsDF = wordsCount.toDF("word","count")
wordsDF.write.avro("/user/vikasgonti/itversity/problem5/solution/")
:'#problem 6
#Get total number of orders for each customer where the cutomer_state = 'TX'
Output Requirements
#Output Fields: customer_fname, customer_lname, order_count
#File Format: text
#Delimiter: Tab character (\t)
#Place the result file in the HDFS directory
#/user/yourusername/problem6/solution/
#Replace yourusername with your OS user name'
val orders = sc.textFile("/public/retail_db/orders")
val customers = sc.textFile("/public/retail_db/customers")
val ordersDF = orders.map(rec => {
val t = rec.split(",")
(t(0),t(2))
}).toDF("order_id","order_customer_id")
val customersDF = customers.map(rec => {
val t = rec.split(",")
(t(0),t(1),t(2), t(7))
}).toDF("customer_id","customer_fname","customer_lname","customer_state")
ordersDF.registerTempTable("orders")
customersDF.registerTempTable("customers")
val res = sqlContext.sql("select customer_fname, customer_lname, count(order_id) order_count "+
"from customers, orders where customer_id = order_customer_id and customer_state = 'TX' "+
"group by customer_fname, customer_lname ")
res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem6/solution/")
:'#problem 7
#List the names of the Top 5 products by revenue ordered on '2013-07-26'. Revenue is considered only for COMPLETE and CLOSED orders.
Output Requirements
#Target Columns: order_date, order_revenue, product_name, product_category_id
#Data has to be sorted in descending order by order_revenue
#File Format: text
#Delimiter: colon (:)
#Place the output file in the HDFS directory
#/user/yourusername/problem7/solution/
#Replace yourusername with your OS user name
'
val orders = sc.textFile("/public/retail_db/orders")
val orderItems = sc.textFile("/public/retail_db/order_items")
val products = sc.textFile("/public/retail_db/products")
val ordersDF = orders.map(rec => {
val t = rec.split(",")
(t(0),t(1).split(" ")(0), t(3))
}).toDF("order_id","order_date","order_status")
val orderItemsDF = orderItems.map(rec => {
val t = rec.split(",")
(t(1),t(2), t(4))
}).toDF("order_item_order_id","order_item_product_id","order_item_subtotal")
val productsDF = products.map(rec => {
val t = rec.split(",")
(t(0),t(1),t(2))
}).toDF("product_id","product_category_id","product_name")
ordersDF.registerTempTable("orders")
orderItemsDF.registerTempTable("orderItems")
productsDF.registerTempTable("products")
val res = sqlContext.sql("select DISTINCT order_date, round(sum(order_item_subtotal) over (partition by product_id), 2) order_revenue, "+
"product_name,product_category_id from orders, products, orderItems where order_id = order_item_order_id "+
"and order_item_product_id = product_id and order_date ='2013-07-26' and order_status IN ('COMPLETE','CLOSED') "+
"order by order_revenue desc limit 5")
res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem7/solution/")
:'#problem 8
#List the order Items where the order_status = PENDING PAYMENT order by order_id
Output Requirements
#Target columns: order_id, order_date, order_customer_id, order_status
#File Format: orc
#Place the output files in the HDFS directory
#/user/yourusername/problem8/solution/
#Replace yourusername with your OS user name'
val ordersRDD = sc.textFile("/public/retail_db/orders")
val ordersDF = orders.map(rec => {
val t = rec.split(",")
(t(0).toInt,t(1), t(2), t(3))
}).toDF("order_id","order_date","order_customer_id","order_status")
val res = ordersDF.filter("order_status = 'PENDING_PAYMENT'").orderBy("order_id")
res.write.orc("/user/vikasgonti/itversity/problem8/solution/")
:'#problem 9
#Remove header from h1b data
Output Requirements
#Remove the header from the data and save rest of the data as is
#Data should be compressed using snappy algorithm
#Place the H1B data in the HDFS directory
#/user/yourusername/problem9/solution/
#Replace yourusername with your OS user name
'
val h1bdata = sc.textFile("/public/h1b/h1b_data")
val h1bHeader = h1bdata.first
val h1bdatawithoutheader = h1bdata.filter(rec => rec!=h1bHeader)
h1bdatawithoutheader.saveAsTextFile("/user/vikasgonti/itversity/problem9/solution/",classOf[org.apache.hadoop.io.compress.SnappyCodec])
:'#problem 10
#Get number of LCAs filed for each year
Output Requirements
#File Format: text
#Output Fields: YEAR, NUMBER_OF_LCAS
#Delimiter: Ascii null "\0"
#Place the output files in the HDFS directory
#/user/yourusername/problem10/solution/
#Replace yourusername with your OS user name'
# ID CASE_STATUS EMPLOYER_NAME SOC_NAME JOB_TITLE FULL_TIME_POSITION PREVAILING_WAGE YEAR WORKSITE lon lat
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(7))
}).toDF("YEAR")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select YEAR, count(1) as NUMBER_OF_LCAS from h1bdata where year != 'NA' group by year")
res.map(rec => rec.mkString("\0")).saveAsTextFile("/user/vikasgonti/itversity/problem10/solution/")
:'#problem 11
#Get number of LCAs by status for the year 2016
Output Requirements
#File Format: json
#Output Field Names: year, status, count
#Place the output files in the HDFS directory
#/user/yourusername/problem11/solution/
#Replace yourusername with your OS user name'
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(1),t(7))
}).toDF("Status", "Year")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select Year, Status, Count(1) as count from h1bdata where year != 'NA' and year = 2016 group by year, status")
res.toJSON.saveAsTextFile("/user/vikasgonti/itversity/problem11/solution/")
:'#problem 12
#Get top 5 employers for year 2016 where the status is WITHDRAWN or CERTIFIED-WITHDRAWN or DENIED
Output Requirements
#File Format: parquet
#Output Fields: employer_name, lca_count
#Data needs to be in descending order by count
#Place the output files in the HDFS directory
#/user/yourusername/problem12/solution/
#Replace yourusername with your OS user name'
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(1),t(2),t(7))
}).toDF("Status","Employer","Year")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select Employer employer_name, count(1) lca_count from h1bdata where year != 'NA' and year = 2016 "+
"and status IN ('WITHDRAWN','CERTIFIED-WITHDRAWN','DENIED') group by Employer order by lca_count desc limit 5")
res.write.parquet("/user/vikasgonti/itversity/problem12/solution/")
:'#problem 13
#Copy all h1b data from HDFS to Hive table excluding those where year is NA or prevailing_wage is NA
Output Requirements
#Save it in Hive Database
#Create Database: CREATE DATABASE IF NOT EXISTS yourusername
#Switch Database: USE yourusername
#Save data to hive table h1b_data
#Create table command:
CREATE TABLE h1b_data (
ID INT,
CASE_STATUS STRING,
EMPLOYER_NAME STRING,
SOC_NAME STRING,
JOB_TITLE STRING,
FULL_TIME_POSITION STRING,
PREVAILING_WAGE DOUBLE,
YEAR INT,
WORKSITE STRING,
LONGITUDE STRING,
LATITUDE STRING
)
Replace yourusername with your OS user name'
val h1bdatawithoutheader = sc.textFile("/public/h1b/h1b_data_noheader")
val h1bDF = h1bdatawithoutheader.map(rec => {
val t = rec.split("\0")
(t(0),t(1),t(2),t(3),t(4),t(5),t(6),t(7),t(8),t(9),t(10))
}).toDF("ID","CASE_STATUS","EMPLOYER_NAME","SOC_NAME","JOB_TITLE","FULL_TIME_POSITION","PREVAILING_WAGE","YEAR","WORKSITE","LONGITUDE","LATITUDE")
sqlContext.sql("use vghivedatabase")
sqlContext.sql("show tables").show
h1bDF.registerTempTable("h1bdata")
sqlContext.sql("insert into h1b_data select * from h1bdata where year != 'NA' and PREVAILING_WAGE!='NA'")
:'#problem 14
#Export h1b data from hdfs to MySQL Database
Output Requirements
#Export data to MySQL Database
#MySQL database is running on ms.itversity.com
#User: h1b_user
#Password: itversity
Database Name: h1b_export
Table Name: h1b_data_yourusername
Nulls are represented as: NA
After export nulls should not be stored as NA in database. It should be represented as database null
Create table command:
CREATE TABLE h1b_data_yourusername (
ID INT,
CASE_STATUS VARCHAR(50),
EMPLOYER_NAME VARCHAR(100),
SOC_NAME VARCHAR(100),
JOB_TITLE VARCHAR(100),
FULL_TIME_POSITION VARCHAR(50),
PREVAILING_WAGE FLOAT,
YEAR INT,
WORKSITE VARCHAR(50),
LONGITUDE VARCHAR(50),
LATITUDE VARCHAR(50));
#Replace yourusername with your OS user name
#Above create table command can be run using
#Login using mysql -u h1b_user -h ms.itversity.com -p
#When prompted enter password itversity
#Switch to database using use h1b_export
#Run above create table command by replacing yourusername with your OS user name'
sqoop export \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_export \
--username h1b_user \
--password itversity \
--table h1b_data_vg \
--export-dir /public/h1b/h1b_data_to_be_exported \
--input-fields-terminated-by '\001' \
--input-null-string 'NA'
:'#problem 15
#Connect to the MySQL database on the itversity labs using sqoop and import data with case_status as CERTIFIED
Output Requirements
#Place the h1b related data in files in HDFS directory
#/user/yourusername/problem15/solution/
#Replace yourusername with your OS user name
#Use avro file format
#Load only those records which have case_status as CERTIFIED completely
#There are 2615623 such records'
sqoop import \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
--username h1b_user \
--password itversity \
--table h1b_data \
--where "CASE_STATUS = 'CERTIFIED'" \
--target-dir /user/vikasgonti/itversity/problem15/solution/ \
--as-avrodatafile
:'#problem 16
#Get NYSE data in ascending order by date and descending order by volume
Output Requirements
#Save data back to HDFS
#Column order: stockticker, transactiondate, openprice, highprice, lowprice, closeprice, volume
#File Format: text
#Delimiter: :
#Place the sorted NYSE data in the HDFS directory
#/user/yourusername/problem16/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/public/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
val res = nyseDF.orderBy(col("transactiondate"),col("volume").desc)
res.map(rec => rec.mkString(":")).saveAsTextFile("/user/vikasgonti/itversity/problem16/solution/")
:'#problem 17
#Get the stock tickers from NYSE data for which full name is missing in NYSE symbols data
Output Requirements
#Get unique stock ticker for which corresponding names are missing in NYSE symbols data
#Save data back to HDFS
#File Format: avro
#Avro dependency details:
#groupId -> com.databricks, artifactId -> spark-avro_2.10, version -> 2.0.1
#Place the sorted NYSE data in the HDFS directory
#/user/yourusername/problem17/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/public/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0).toString)
}).toDF("stockticker")
nyseDF.registerTempTable("nyse")
val nysesymRDD = sc.textFile("/public/nyse_symbols")
val first = nysesymRDD.first
val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
val t = rec.split("\t")
(t(0).toString)
}).toDF("stocksymbol")
symDataDF.registerTempTable("symData")
val res = sqlContext.sql("select DISTINCT stockticker as Symbol from nyse n "+
"left outer join symData s on n.stockticker = s.stocksymbol and s.stocksymbol is null")
import com.databricks.spark.avro._
res.write.avro("/user/vikasgonti/itversity/problem17/solution/")
:'#problem 18
#Get the name of stocks displayed along with other information
Output Requirements
#Get all NYSE details along with stock name if exists, if not stockname should be empty
#Column Order: stockticker, stockname, transactiondate, openprice, highprice, lowprice, closeprice, volume
#Delimiter: ,
#File Format: text
#Place the data in the HDFS directory
#/user/yourusername/problem18/solution/
#Replace yourusername with your OS user name'
val nyseRDD = sc.textFile("/public/nyse")
val nyseDF = nyseRDD.map(rec => {
val t = rec.split(",")
(t(0).toString,t(1).toInt,t(2).toFloat,t(3).toFloat,t(4).toFloat,t(5).toFloat,t(6).toInt)
}).toDF("stockticker","transactiondate","openprice","highprice","lowprice","closeprice","volume")
nyseDF.registerTempTable("nyse")
val nysesymRDD = sc.textFile("/public/nyse_symbols")
val first = nysesymRDD.first
val symDataDF = nysesymRDD.filter(rec => rec!= first).map(rec => {
val t = rec.split("\t")
(t(0).toString, t(1).toString)
}).toDF("stocksymbol", "stockname")
symDataDF.registerTempTable("symData")
val res = sqlContext.sql("select stockticker, nvl(stockname,'') stockname, transactiondate, openprice, "+
"highprice, lowprice, closeprice, volume "+
"from nyse n left outer join symData s on n.stockticker = s.stocksymbol")
res.map(rec => rec.mkString(",")).saveAsTextFile("/user/vikasgonti/itversity/problem18/solution/")
:'#problem 19
#Get number of companies who filed LCAs for each year
Output Requirements
#File Format: text
#Delimiter: tab character "\t"
#Output Field Order: year, lca_count
#Place the output files in the HDFS directory
#/user/yourusername/problem19/solution/
#Replace yourusername with your OS user name'
val h1bdata = sc.textFile("/public/h1b/h1b_data_noheader")
val h1bDF = h1bdata.map(rec => {
val t = rec.split("\0")
(t(2),t(7))
}).toDF("EMPLOYER_NAME","YEAR")
h1bDF.registerTempTable("h1bdata")
val res = sqlContext.sql("select YEAR, count(EMPLOYER_NAME) as lca_count from h1bdata where year != 'NA' group by year")
res.map(rec => rec.mkString("\t")).saveAsTextFile("/user/vikasgonti/itversity/problem19/solution/")
:'#problem 20
#using sqoop and import data with employer_name, case_status and count.
#Make sure data is sorted by employer_name in ascending order and by count in descending order
Output Requirements
#Place the h1b related data in files in HDFS directory
#/user/yourusername/problem20/solution/
#Replace yourusername with your OS user name
#Use text file format and tab (\t) as delimiter
#Hint: You can use Spark with JDBC or Sqoop import with query
#You might not get such hints in actual exam
#Output should contain employer name, case status and count
'
sqoop eval \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
--username h1b_user \
--password itversity \
--query "select count(*) from (select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data group by EMPLOYER_NAME, CASE_STATUS) A"
sqoop import \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://ms.itversity.com:3306/h1b_db \
--username h1b_user \
--password itversity \
--query "select EMPLOYER_NAME, CASE_STATUS, count(1) as count from h1b_data where \$CONDITIONS group by EMPLOYER_NAME, CASE_STATUS order by EMPLOYER_NAME, count desc" \
--target-dir /user/vikasgonti/itversity/problem20/solution/ \
--as-textfile \
--fields-terminated-by '\t' \
--split-by case_status
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment