Skip to content

Instantly share code, notes, and snippets.

@truongtpa
Created April 6, 2023 17:05
Show Gist options
  • Select an option

  • Save truongtpa/8dbbd7194325f2f231a776132e01f6f5 to your computer and use it in GitHub Desktop.

Select an option

Save truongtpa/8dbbd7194325f2f231a776132e01f6f5 to your computer and use it in GitHub Desktop.
Program snippets
```
package com.truongtpa
package Study2b
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object Scenario4 {
def main(args: Array[String]): Unit = {
/*
dataset 01: 70GB
dataset 02: 80GB
*/
val appName = "scenario4-study2b"
val spark = SparkSession.builder()
// .master("local[*]")
.config("spark.executor.memory", "12g")
.config("spark.driver.maxResultSize", "160g")
.appName(appName)
.getOrCreate()
val s3accessKeyAws = "**"
val s3secretKeyAws = "**"
val connectionTimeOut = "600000"
val s3endPointLoc: String = "http://192.168.1.100"
val sc = spark.sparkContext
// Read file from S3 with capacity is 70GB
var rddL: RDD[String] = spark.sparkContext.emptyRDD[String]
for (index <- 16 to 29) {
val fileName = f"$index%02d"
rddL = Tools.readS3A(sc, fileName).union(rddL)
}
// Create filter with BF
val BF = Tools.rdd2BF(rddL)
// Read file from S3 with capacity is 80GB
var coutRS : Long = 0
for (index <- 0 to 15) {
val fileName = f"$index%02d"
coutRS = coutRS + Tools.readS3A(sc, fileName).filter(item => BF.contains(item)).count()
}
print("\nResult: " + coutRS + "\n\n")
sc.stop()
}
}
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment