Skip to content

Instantly share code, notes, and snippets.

@joao-parana
Created October 1, 2018 20:27
Show Gist options
  • Select an option

  • Save joao-parana/b9bf9aac71ea9a4c5d86bbb938f2c3b4 to your computer and use it in GitHub Desktop.

Select an option

Save joao-parana/b9bf9aac71ea9a4c5d86bbb938f2c3b4 to your computer and use it in GitHub Desktop.

Revisions

  1. joao-parana created this gist Oct 1, 2018.
    91 changes: 91 additions & 0 deletions W1.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,91 @@
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.SparkSession.Builder
    import org.apache.spark.SparkContext
    import org.apache.log4j.{Level, Logger}
    // A sparkSession é provida pelo proprio Spark Shell
    // O nivel de log também já é configurado pela Spark Shell
    def boolean_udf_wrapper(a:String, b:String, t:Any): Boolean = { true }
    def string_udf_wrapper(a:String, b:String, t:Any): String = { "••••" }
    import org.apache.spark.sql.functions.expr
    import org.apache.spark.sql.functions.sum
    import org.apache.spark.sql.catalyst.dsl.plans.table
    import org.apache.spark.sql.catalyst.dsl.expressions.{sum,max,min,first,last,count,avg}
    //
    // O código acima é constante. A parte mutável aparece abaixo.
    //
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types.{DataType, LongType, StructType}
    class MyCountUDAF extends UserDefinedAggregateFunction {
    // UserDefinedAggregateFunction is the contract to define
    // user-defined aggregate functions (UDAFs).
    // Este método abaixo define pode ser invocado apenas assim: inputSchema(0)
    // Isto é feito via inversão de dependência pelo Spark
    // o retorno é um objeto StructField assim:
    // StructField("id", LongType, true, {})
    // o objeto StructField é do pacote org.apache.spark.sql.types
    override def inputSchema: StructType = {
    new StructType().add("id", LongType, nullable = true)
    }
    // O buffer para resultado temporário possui um único atributo
    // no caso da funcionalidade de contagem.
    // Este método abaixo define pode ser invocado apenas assim: bufferSchema(0)
    // Isto é feito via inversão de dependência pelo Spark
    // o retorno é um objeto StructField assim:
    // StructField("count", LongType, true, {})
    override def bufferSchema: StructType = {
    new StructType().add("count", LongType, nullable = true)
    }
    // O método abaixo deve ser invocado sem parênteses em Scala.
    // refere-se ao tipo do atributo de saida
    override def dataType: DataType = LongType
    override def deterministic: Boolean = true
    // O método abaixo inicializa o buffer.
    // Isto é feito via inversão de dependência pelo Spark
    // Observe que a única coisa a ser feita é inicializar o contador com Zero.
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
    println(s">>> initialize (buffer: $buffer)")
    // NOTE: Scala's update used under the covers
    buffer(0) = 0L
    }
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    println(s">>> update (buffer: $buffer -> input: $input)")
    buffer(0) = buffer.getLong(0) + 1
    }
    override def merge(buffer: MutableAggregationBuffer, row: Row): Unit = {
    println(s">>> merge (buffer: $buffer -> row: $row)")
    buffer(0) = buffer.getLong(0) + row.getLong(0)
    }
    override def evaluate(buffer: Row): Any = {
    println(s">>> evaluate (buffer: $buffer)")
    buffer.getLong(0)
    }
    }
    // Criando o objeto MyCountUDAF para ser usada com a API de Dataset/DataFrame
    val myCountUDAF = new MyCountUDAF
    //
    case class R7_Tuple(deptId:Long,deptName:String){}
    val R7_Dataset = spark.read.json("DATA/depto.json").as[R7_Tuple]
    case class R6_Tuple(deptId: Long, deptName: String){}
    val R6_Dataset = R7_Dataset.filter(t => boolean_udf_wrapper("scala", "oldestDeptos", t)).as[R6_Tuple]
    case class R8_Tuple(deptId:Long){}
    val R8_Dataset = spark.read.json("DATA/depto_ids.json").as[R8_Tuple]
    case class R2_Tuple( deptId:Long, deptName:String){}
    val R2_Dataset = R8_Dataset.join(R6_Dataset, "deptId").as[R2_Tuple]
    case class R0_Tuple(deptId:Long,name:String,salary:Double){}
    val R0_Dataset = spark.read.json("DATA/employees.json").as[R0_Tuple]
    case class R1_Tuple( deptId: Long, name: String, salary: Double, nameSmartCased: String ){}
    val R1_Dataset = R0_Dataset.map(t => R1_Tuple(t.deptId, t.name, t.salary, string_udf_wrapper("scala", "smartTextCase", t) ))
    case class R3_Tuple( deptId:Long, name:String, salary:Double, nameSmartCased:String, deptName: String){}
    val R3_Dataset = R1_Dataset.join(R2_Dataset, "deptId").as[R3_Tuple]
    case class R4_Tuple(deptId: Long, name: String, salary: Double, nameSmartCased: String, deptName: String){}
    val R4_Dataset = R3_Dataset.filter(t => boolean_udf_wrapper("scala", "happyEmployees", t)).as[R4_Tuple]
    case class R5_Tuple( deptId: Long, sum_salary: Double ){}
    val R5_Dataset = R4_Dataset.groupBy("deptId").
    agg(expr("sum(salary)").alias("sum_salary")).as[R5_Tuple]
    //
    val agregated_1 = R4_Dataset.groupBy('deptId).agg(myCountUDAF('name) as "count")
    agregated_1.show(10)
    val agregated_2 = R4_Dataset.groupBy('deptId).agg(myCountUDAF.distinct('salary) as "count")
    agregated_2.show(10)
    R5_Dataset.show(10)