package tags import frameless.ops.SmartProject import frameless.{TypedColumn, TypedDataset} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import scala.Predef.require // Untagged types with derived projections object s4 extends App { type Name = String def name(id: String): Name = id type Size = Int def size(n: Int): Size = n type Length = Int def length(n: Int): Length = n // Hierarchy of tags sealed abstract trait ATag sealed trait BTag extends ATag sealed trait CTag extends BTag sealed trait DTag extends ATag type AId = String def aid(id: String): AId = id type BId = String def bid(id: String): BId = id type CId = String def cid(id: String): CId = id type DId = String def did(id: String): DId = id // Case classes for TypedDatasets case class A(id: AId) case class B(id: BId, name: Name) case class B1(id: BId) case class C(id: CId, name: Name, size: Size) case class D(id: DId, length: Length) implicit val C2BSmartProject: SmartProject[C, B] = SmartProject[C, B]( (x: TypedDataset[C]) => { val x_id: TypedColumn[C, BId] = x.col[CId]('id).cast[BId] val x_name: TypedColumn[C, Name] = x.col[Name]('name) x.select[BId, Name](x_id, x_name).as[B] }) val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) implicit val spark = SparkSession.builder().config(conf).getOrCreate() implicit val sqlContext = spark.sqlContext import frameless.syntax.DefaultSparkDelay val bs: TypedDataset[B] = TypedDataset.create[B](Seq( B(bid("12345"), name("Mark")), B(bid("23456"), name("Jane")) )) val cs: TypedDataset[C] = TypedDataset.create[C](Seq( C(cid("02468"), name("Table"), size(8)), C(cid("13579"), name("Chair"), size(4)) )) val ds: TypedDataset[D] = TypedDataset.create[D](Seq( D(did("03846"), length(2)), D(did("19384"), length(3)) )) val bcs: TypedDataset[B] = bs.union(cs.project[B]) val bcs_collection = bcs.collect().run() require(bcs_collection.size == 4) val b1s: TypedDataset[B1] = bcs.project[B1] val b1s_collection = b1s.collect().run() require(b1s_collection.size == 4) val ab1s: TypedDataset[A] = b1s.project[A] val ab1s_collection = ab1s.collect().run() require(ab1s_collection.size == 4) val all: TypedDataset[A] = bcs.project[A].union(ds.project[A]) val all_collection = all.collect().run() require(all_collection.size == 6) }