package tags import frameless.ops.SmartProject import frameless.{TypedColumn, TypedDataset} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import scala.{App,Int} import scala.Predef.{require,String} // Untagged types with SmartProject for unary case class object s3 extends App { type Name = String def name(id: String): Name = id type Size = Int def size(n: Int): Size = n type Length = Int def length(n: Int): Length = n type AId = String def aid(id: String): AId = id type BId = String def bid(id: String): BId = id type CId = String def cid(id: String): CId = id type DId = String def did(id: String): DId = id // Case classes for TypedDatasets case class A(id: AId) case class B(id: BId, name: Name) case class B1(id: BId) case class C(id: CId, name: Name, size: Size) case class D(id: DId, length: Length) implicit val C2BSmartProject: SmartProject[C, B] = SmartProject[C, B]( (x: TypedDataset[C]) => { val x_id: TypedColumn[C, BId] = x.col[CId]('id).cast[BId] val x_name: TypedColumn[C, Name] = x.col[Name]('name) x.select[BId, Name](x_id, x_name).as[B] }) val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) implicit val spark = SparkSession.builder().config(conf).getOrCreate() implicit val sqlContext = spark.sqlContext import frameless.syntax.DefaultSparkDelay val bs: TypedDataset[B] = TypedDataset.create[B](Seq( B(bid("12345"), name("Mark")), B(bid("23456"), name("Jane")) )) val cs: TypedDataset[C] = TypedDataset.create[C](Seq( C(cid("02468"), name("Table"), size(8)), C(cid("13579"), name("Chair"), size(4)) )) val ds: TypedDataset[D] = TypedDataset.create[D](Seq( D(did("03846"), length(2)), D(did("19384"), length(3)) )) val bcs: TypedDataset[B] = bs.union(cs.project[B]) val bcs_collection = bcs.collect().run() require(bcs_collection.size == 4) // Compilation errors if uncommented. val b1s: TypedDataset[B1] = bcs.project[B1] val b1s_collection = b1s.collect().run() require(b1s_collection.size == 4) val ab1s: TypedDataset[A] = b1s.project[A] val ab1s_collection = ab1s.collect().run() require(ab1s_collection.size == 4) // Compilation errors if uncommented. // implicit val B2ASmartProject: SmartProject[B, A] = SmartProject[B, A]( (x: TypedDataset[B]) => { // val x_id: TypedColumn[B, AId] = x.col[BId]('id).cast[AId] // x.select[AId](x_id).as[A] // line 86 // }) // // implicit val D2ASmartProject: SmartProject[D, A] = SmartProject[D, A]( (x: TypedDataset[D]) => { // val x_id: TypedColumn[D, AId] = x.col[DId]('id).cast[AId] // x.select[AId](x_id).as[A] // line 91 // }) val all: TypedDataset[A] = bcs.project[A].union(ds.project[A]) val all_collection = all.collect().run() require(all_collection.size == 6) // Compilation errors if the above lines are all uncommented: // Error:(86, 27) could not find implicit value for parameter as: frameless.ops.As[tags.s3.AId,tags.s3.A] // x.select[AId](x_id).as[A] // line 86 // Error:(86, 27) not enough arguments for method as: (implicit as: frameless.ops.As[tags.s3.AId,tags.s3.A])frameless.TypedDataset[tags.s3.A]. // Unspecified value parameter as. // x.select[AId](x_id).as[A] // line 86 // Error:(91, 27) could not find implicit value for parameter as: frameless.ops.As[tags.s3.AId,tags.s3.A] // x.select[AId](x_id).as[A] // line 91 // Error:(91, 27) not enough arguments for method as: (implicit as: frameless.ops.As[tags.s3.AId,tags.s3.A])frameless.TypedDataset[tags.s3.A]. // Unspecified value parameter as. // x.select[AId](x_id).as[A] // line 91 }