Skip to content

Instantly share code, notes, and snippets.

@wiikviz
Created November 26, 2019 15:54
Show Gist options
  • Select an option

  • Save wiikviz/0a981083ce5882f3413c014c8485090f to your computer and use it in GitHub Desktop.

Select an option

Save wiikviz/0a981083ce5882f3413c014c8485090f to your computer and use it in GitHub Desktop.
Test
package ru.sber
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val spark = SparkSession.builder.master("local").getOrCreate()
import spark.implicits._
Seq(
(1, "2019-01-01", 101),
(2, "2019-01-02", 102),
(3, "2019-01-03", 103),
(4, "2019-01-04", 99),
(5, "2019-01-05", 99),
(6, "2019-01-06", 101),
(7, "2019-01-07", 99),
(8, "2019-01-08", 101),
(9, "2019-01-09", 101),
(10, "2019-01-10", 101),
(11, "2019-01-11", 101),
(12, "2019-01-12", 99),
(13, "2019-01-13", 101),
(14, "2019-01-14", 101),
)
.toDF("visitId", "date", "totalVisits")
.createOrReplaceTempView("visits")
val df = spark.sql(
"""
|SELECT visitId, date, totalVisits FROM (
|SELECT
| visitId,
| date,
| totalVisits,
| totalVisits > 100 AND LEAD(totalVisits, 1, 0) OVER (ORDER BY date ) > 100 AND LEAD(totalVisits, 2, 0) OVER (ORDER BY date ) > 100 as onTop,
| totalVisits > 100 AND LAG(totalVisits, 1, 0) OVER (ORDER BY date ) > 100 AND LEAD(totalVisits, 1, 0) OVER (ORDER BY date ) > 100 as onCenter,
| totalVisits > 100 AND LAG(totalVisits, 1, 0) OVER (ORDER BY date ) > 100 AND LAG(totalVisits, 2, 0) OVER (ORDER BY date ) > 100 as onBottom
|FROM visits
|)
|WHERE onTop OR onCenter OR onBottom
|""".stripMargin)
df.explain()
df.show()
spark.stop()
}
}
== Physical Plan ==
*(2) Project [visitId#7, date#8, totalVisits#9]
+- *(2) Filter ((totalVisits#9 > 100) && ((((_we0#26 > 100) && (_we1#27 > 100)) || ((_we2#28 > 100) && (_we3#29 > 100))) || ((_we4#30 > 100) && (_we5#31 > 100))))
+- Window [lead(totalVisits#9, 1, 0) windowspecdefinition(date#8 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS _we0#26, lead(totalVisits#9, 2, 0) windowspecdefinition(date#8 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 2, 2)) AS _we1#27, lag(totalVisits#9, 1, 0) windowspecdefinition(date#8 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we2#28, lead(totalVisits#9, 1, 0) windowspecdefinition(date#8 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS _we3#29, lag(totalVisits#9, 1, 0) windowspecdefinition(date#8 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we4#30, lag(totalVisits#9, 2, 0) windowspecdefinition(date#8 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, -2)) AS _we5#31], [date#8 ASC NULLS FIRST]
+- *(1) Sort [date#8 ASC NULLS FIRST], false, 0
+- Exchange SinglePartition
+- LocalTableScan [visitId#7, date#8, totalVisits#9]
+-------+----------+-----------+
|visitId| date|totalVisits|
+-------+----------+-----------+
| 1|2019-01-01| 101|
| 2|2019-01-02| 102|
| 3|2019-01-03| 103|
| 8|2019-01-08| 101|
| 9|2019-01-09| 101|
| 10|2019-01-10| 101|
| 11|2019-01-11| 101|
+-------+----------+-----------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment