package tags import frameless.{TypedColumn, TypedDataset} import frameless.ops.SmartProject import frameless.{CatalystCast, Injection} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import scala.{App,Int} import scala.Predef.{require,String} // Tagged types version object s2 extends App { /** * This is a covariant version of Shapeless 2.3.2 tag. * @see https://github.com/milessabin/shapeless/blob/shapeless-2.3.2/core/src/main/scala/shapeless/typeoperators.scala#L25 */ object covariantTag { def apply[U] = new Tagger[U] trait Tagged[+U] class Tagger[U] { def apply[T](t : T) : T @@ U = t.asInstanceOf[T @@ U] } type @@[+T, +U] = T with Tagged[U] } import covariantTag.@@ implicit def intTagInjection[Tag] = new Injection[Int @@ Tag, Int] { def apply(t: Int @@ Tag): Int = t def invert(s: Int): Int @@ Tag = covariantTag[Tag][Int](s) } implicit def stringTagInjection[Tag] = new Injection[String @@ Tag, String] { def apply(t: String @@ Tag): String = t def invert(s: String): String @@ Tag = covariantTag[Tag][String](s) } sealed trait NameTag type Name = String @@ NameTag def name(id: String): Name = covariantTag[NameTag][String](id) sealed trait SizeTag type Size = Int @@ SizeTag def size(n: Int): Size = covariantTag[SizeTag][Int](n) sealed trait LengthTag type Length = Int @@ LengthTag def length(n: Int): Length = covariantTag[LengthTag][Int](n) // Hierarchy of tags sealed abstract trait ATag sealed trait BTag extends ATag sealed trait CTag extends BTag sealed trait DTag extends ATag type AId = String @@ ATag def aid(id: String): AId = covariantTag[ATag][String](id) type BId = String @@ BTag def bid(id: String): BId = covariantTag[BTag][String](id) type CId = String @@ CTag def cid(id: String): CId = covariantTag[CTag][String](id) type DId = String @@ DTag def did(id: String): DId = covariantTag[DTag][String](id) // CatalystCast instances to make the following work when Nothing <: U <: V <: Any|Object // x.col[T @@ U]('field).cast[T @@ V] private val theInstance = new CatalystCast[Any, Any] {} implicit val BId2AId: CatalystCast[BId, AId] = theInstance.asInstanceOf[CatalystCast[BId, AId]] implicit val CId2BId: CatalystCast[CId, BId] = theInstance.asInstanceOf[CatalystCast[CId, BId]] implicit val DId2AId: CatalystCast[DId, AId] = theInstance.asInstanceOf[CatalystCast[DId, AId]] // Case classes for TypedDatasets case class A(id: AId) case class B(id: BId, name: Name) case class B1(id: BId) case class C(id: CId, name: Name, size: Size) case class D(id: DId, length: Length) implicit val C2BSmartProject: SmartProject[C, B] = SmartProject[C, B]( (x: TypedDataset[C]) => { val x_id: TypedColumn[C, BId] = x.col[CId]('id).cast[BId] val x_name: TypedColumn[C, Name] = x.col[Name]('name) x.select[BId, Name](x_id, x_name).as[B] }) val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) implicit val spark = SparkSession.builder().config(conf).getOrCreate() implicit val sqlContext = spark.sqlContext import frameless.syntax.DefaultSparkDelay val bs: TypedDataset[B] = TypedDataset.create[B](Seq( B(bid("12345"), name("Mark")), B(bid("23456"), name("Jane")) )) val cs: TypedDataset[C] = TypedDataset.create[C](Seq( C(cid("02468"), name("Table"), size(8)), C(cid("13579"), name("Chair"), size(4)) )) val ds: TypedDataset[D] = TypedDataset.create[D](Seq( D(did("03846"), length(2)), D(did("19384"), length(3)) )) val bcs: TypedDataset[B] = bs.union(cs.project[B]) val bcs_collection = bcs.collect().run() require(bcs_collection.size == 4) // Compilation errors if uncommented. // val b1s: TypedDataset[B1] = bcs.project[B1] // val b1s_collection = b1s.collect().run() // require(b1s_collection.size == 4) // // val ab1s: TypedDataset[A] = b1s.project[A] // line 121 // val ab1s_collection = ab1s.collect().run() // require(ab1s_collection.size == 4) // // implicit val B2ASmartProject: SmartProject[B, A] = SmartProject[B, A]( (x: TypedDataset[B]) => { // val x_id: TypedColumn[B, AId] = x.col[BId]('id).cast[AId] // x.select[AId](x_id).as[A] // line 127 // }) // // implicit val D2ASmartProject: SmartProject[D, A] = SmartProject[D, A]( (x: TypedDataset[D]) => { // val x_id: TypedColumn[D, AId] = x.col[DId]('id).cast[AId] // x.select[AId](x_id).as[A] // line 132 // }) // // val all: TypedDataset[A] = bcs.project[A].union(ds.project[A]) // val all_collection = all.collect().run() // require(all_collection.size == 6) // Compilation errors if the above lines are all uncommented: // Error:(121, 42) Cannot prove that tags.s2.B1 can be projected to tags.s2.A. Perhaps not all member names and types of tags.s2.A are the same in tags.s2.B1? // val ab1s: TypedDataset[A] = b1s.project[A] // line 121 // Error:(121, 42) not enough arguments for method project: (implicit projector: frameless.ops.SmartProject[tags.s2.B1,tags.s2.A])frameless.TypedDataset[tags.s2.A]. // Unspecified value parameter projector. // val ab1s: TypedDataset[A] = b1s.project[A] // line 121 // Error:(127, 27) could not find implicit value for parameter as: frameless.ops.As[tags.s2.AId,tags.s2.A] // x.select[AId](x_id).as[A] // line 127 // Error:(127, 27) not enough arguments for method as: (implicit as: frameless.ops.As[tags.s2.AId,tags.s2.A])frameless.TypedDataset[tags.s2.A]. // Unspecified value parameter as. // x.select[AId](x_id).as[A] // line 127 // Error:(132, 27) could not find implicit value for parameter as: frameless.ops.As[tags.s2.AId,tags.s2.A] // x.select[AId](x_id).as[A] // line 132 // Error:(132, 27) not enough arguments for method as: (implicit as: frameless.ops.As[tags.s2.AId,tags.s2.A])frameless.TypedDataset[tags.s2.A]. // Unspecified value parameter as. // x.select[AId](x_id).as[A] // line 132 }