Created
October 1, 2018 20:27
-
-
Save joao-parana/b9bf9aac71ea9a4c5d86bbb938f2c3b4 to your computer and use it in GitHub Desktop.
Revisions
-
joao-parana created this gist
Oct 1, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,91 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.SparkContext import org.apache.log4j.{Level, Logger} // A sparkSession é provida pelo proprio Spark Shell // O nivel de log também já é configurado pela Spark Shell def boolean_udf_wrapper(a:String, b:String, t:Any): Boolean = { true } def string_udf_wrapper(a:String, b:String, t:Any): String = { "••••" } import org.apache.spark.sql.functions.expr import org.apache.spark.sql.functions.sum import org.apache.spark.sql.catalyst.dsl.plans.table import org.apache.spark.sql.catalyst.dsl.expressions.{sum,max,min,first,last,count,avg} // // O código acima é constante. A parte mutável aparece abaixo. // import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, LongType, StructType} class MyCountUDAF extends UserDefinedAggregateFunction { // UserDefinedAggregateFunction is the contract to define // user-defined aggregate functions (UDAFs). // Este método abaixo define pode ser invocado apenas assim: inputSchema(0) // Isto é feito via inversão de dependência pelo Spark // o retorno é um objeto StructField assim: // StructField("id", LongType, true, {}) // o objeto StructField é do pacote org.apache.spark.sql.types override def inputSchema: StructType = { new StructType().add("id", LongType, nullable = true) } // O buffer para resultado temporário possui um único atributo // no caso da funcionalidade de contagem. // Este método abaixo define pode ser invocado apenas assim: bufferSchema(0) // Isto é feito via inversão de dependência pelo Spark // o retorno é um objeto StructField assim: // StructField("count", LongType, true, {}) override def bufferSchema: StructType = { new StructType().add("count", LongType, nullable = true) } // O método abaixo deve ser invocado sem parênteses em Scala. // refere-se ao tipo do atributo de saida override def dataType: DataType = LongType override def deterministic: Boolean = true // O método abaixo inicializa o buffer. // Isto é feito via inversão de dependência pelo Spark // Observe que a única coisa a ser feita é inicializar o contador com Zero. override def initialize(buffer: MutableAggregationBuffer): Unit = { println(s">>> initialize (buffer: $buffer)") // NOTE: Scala's update used under the covers buffer(0) = 0L } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { println(s">>> update (buffer: $buffer -> input: $input)") buffer(0) = buffer.getLong(0) + 1 } override def merge(buffer: MutableAggregationBuffer, row: Row): Unit = { println(s">>> merge (buffer: $buffer -> row: $row)") buffer(0) = buffer.getLong(0) + row.getLong(0) } override def evaluate(buffer: Row): Any = { println(s">>> evaluate (buffer: $buffer)") buffer.getLong(0) } } // Criando o objeto MyCountUDAF para ser usada com a API de Dataset/DataFrame val myCountUDAF = new MyCountUDAF // case class R7_Tuple(deptId:Long,deptName:String){} val R7_Dataset = spark.read.json("DATA/depto.json").as[R7_Tuple] case class R6_Tuple(deptId: Long, deptName: String){} val R6_Dataset = R7_Dataset.filter(t => boolean_udf_wrapper("scala", "oldestDeptos", t)).as[R6_Tuple] case class R8_Tuple(deptId:Long){} val R8_Dataset = spark.read.json("DATA/depto_ids.json").as[R8_Tuple] case class R2_Tuple( deptId:Long, deptName:String){} val R2_Dataset = R8_Dataset.join(R6_Dataset, "deptId").as[R2_Tuple] case class R0_Tuple(deptId:Long,name:String,salary:Double){} val R0_Dataset = spark.read.json("DATA/employees.json").as[R0_Tuple] case class R1_Tuple( deptId: Long, name: String, salary: Double, nameSmartCased: String ){} val R1_Dataset = R0_Dataset.map(t => R1_Tuple(t.deptId, t.name, t.salary, string_udf_wrapper("scala", "smartTextCase", t) )) case class R3_Tuple( deptId:Long, name:String, salary:Double, nameSmartCased:String, deptName: String){} val R3_Dataset = R1_Dataset.join(R2_Dataset, "deptId").as[R3_Tuple] case class R4_Tuple(deptId: Long, name: String, salary: Double, nameSmartCased: String, deptName: String){} val R4_Dataset = R3_Dataset.filter(t => boolean_udf_wrapper("scala", "happyEmployees", t)).as[R4_Tuple] case class R5_Tuple( deptId: Long, sum_salary: Double ){} val R5_Dataset = R4_Dataset.groupBy("deptId"). agg(expr("sum(salary)").alias("sum_salary")).as[R5_Tuple] // val agregated_1 = R4_Dataset.groupBy('deptId).agg(myCountUDAF('name) as "count") agregated_1.show(10) val agregated_2 = R4_Dataset.groupBy('deptId).agg(myCountUDAF.distinct('salary) as "count") agregated_2.show(10) R5_Dataset.show(10)