Jay*_*Zee 8 apache-spark apache-spark-sql
我有一个数据框
| id | date | KPI_1 | ... | KPI_n
| 1 |2012-12-12 | 0.1 | ... | 0.5
| 2 |2012-12-12 | 0.2 | ... | 0.4
| 3 |2012-12-12 | 0.66 | ... | 0.66
| 1 |2012-12-13 | 0.2 | ... | 0.46
| 4 |2012-12-14 | 0.2 | ... | 0.45
| ...
| 55| 2013-03-15 | 0.5 | ... | 0.55
Run Code Online (Sandbox Code Playgroud)
我们有
我必须为每一行计算一些派生的 KPI,而这个 KPI 取决于每个 ID 的先前值。假设我的派生 KPI 是一个差异,它将是:
| id | date | KPI_1 | ... | KPI_n | KPI_1_diff | KPI_n_diff
| 1 |2012-12-12 | 0.1 | ... | 0.5 | 0.1 | 0.5
| 2 |2012-12-12 | 0.2 | ... | 0.4 | 0.2 |0.4
| 3 |2012-12-12 | 0.66 | ... | 0.66 | 0.66 | 0.66
| 1 |2012-12-13 | 0.2 | ... | 0.46 | 0.2-0.1 | 0.46 - 0.66
| 4 |2012-12-13 | 0.2 | ... | 0.45 ...
| ...
| 55| 2013-03-15 | 0.5 | ... | 0.55
Run Code Online (Sandbox Code Playgroud)
现在:我会做的是:
val groupedDF = myDF.groupBy("id").agg(
collect_list(struct(col("date",col("KPI_1"))).as("wrapped_KPI_1"),
collect_list(struct(col("date",col("KPI_2"))).as("wrapped_KPI_2")
// up until nth KPI
)
Run Code Online (Sandbox Code Playgroud)
我会得到汇总数据,例如:
[("2012-12-12",0.1),("2012-12-12",0.2) ...
Run Code Online (Sandbox Code Playgroud)
然后我会对这些包装的数据进行排序,使用一些 UDF 对这些聚合结果进行解包和映射,并生成输出(计算差异和其他统计数据)。
另一种方法是使用窗口函数,例如:
val window = Window.partitionBy(col("id")).orderBy(col("date")).rowsBetween(Window.unboundedPreceding,0L)
Run Code Online (Sandbox Code Playgroud)
并做:
val windowedDF = df.select (
col("id"),
col("date"),
col("KPI_1"),
collect_list(struct(col("date"),col("KPI_1"))).over(window),
collect_list(struct(col("date"),col("KPI_2"))).over(window)
)
Run Code Online (Sandbox Code Playgroud)
这样我得到:
[("2012-12-12",0.1)]
[("2012-12-12",0.1), ("2012-12-13",0.1)]
...
Run Code Online (Sandbox Code Playgroud)
这看起来更好处理,但我怀疑重复窗口会为每个 KPI 产生不必要的分组和排序。
所以这里是问题:
Apu*_*dey 15
我相信窗口方法应该是一个更好的解决方案,但在使用窗口函数之前,您应该根据 id 重新分区数据帧。这只会对数据进行一次混洗,并且所有窗口函数都应该使用已经混洗的数据帧执行。我希望它有帮助。
代码应该是这样的。
val windowedDF = df.repartition(col("id"))
.select (
col("id"),
col("date"),
col("KPI_1"),
col("KPI_2"),
collect_list(struct(col("date"),col("KPI_1"))).over(window),
collect_list(struct(col("date"),col("KPI_2"))).over(window)
)
Run Code Online (Sandbox Code Playgroud)
@拉斐尔·罗斯
在这里,我们在单个窗口上聚合。这就是为什么您可能会看到相同的执行计划。请参阅下面的示例,其中只能从一个分区完成多个窗口的聚合。
val list = Seq(( "2", null, 1, 11, 1, 1 ),
( "2", null, 1, 22, 2, 2 ),
( "2", null, 1, 11, 1, 3 ),
( "2", null, 1, 22, 2, 1 ),
( "2", null, 1, 33, 1, 2 ),
( null, "3", 3, 33, 1, 2 ),
( null, "3", 3, 33, 2, 3 ),
( null, "3", 3, 11, 1, 1 ),
( null, "3", 3, 22, 2, 2 ),
( null, "3", 3, 11, 1, 3 )
)
val df = spark.sparkContext.parallelize(list).toDF("c1","c2","batchDate","id", "pv" , "vv")
val c1Window = Window.partitionBy("batchDate", "c1")
val c2Window = Window.partitionBy("batchDate", "c2")
val agg1df = df.withColumn("c1List",collect_list("pv").over(c1Window))
.withColumn("c2List", collect_list("pv").over(c2Window))
val agg2df = df.repartition($"batchDate")
.withColumn("c1List",collect_list("pv").over(c1Window))
.withColumn("c2List", collect_list("pv").over(c2Window))
agg1df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#38], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, c2#15, 1)
+- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#28], [batchDate#16, c1#14]
+- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, c1#14, 1)
+- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
+- Scan ExternalRDDScan[obj#6]
agg2df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#60], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
+- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#50], [batchDate#16, c1#14]
+- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, 1)
+- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
+- Scan ExternalRDDScan[obj#6]
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
10307 次 |
最近记录: |