Spark SQL 使用 foldLeft 和 withColumn 替代 groupby/pivot/agg/collect_list 以提高性能

Abi*_*rty 5 apache-spark apache-spark-sql apache-spark-dataset

我有一个由三列组成的 Spark DataFrame:

 id | col1 | col2 
-----------------
 x  |  p1  |  a1  
-----------------
 x  |  p2  |  b1
-----------------
 y  |  p2  |  b2
-----------------
 y  |  p2  |  b3
-----------------
 y  |  p3  |  c1
Run Code Online (Sandbox Code Playgroud)

申请后,df.groupBy("id").pivot("col1").agg(collect_list("col2"))我得到以下数据帧(aggDF):

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|
+---+----+--------+----+
Run Code Online (Sandbox Code Playgroud)

然后我找到除了列之外的id列的名称。

val cols = aggDF.columns.filter(x => x != "id")
Run Code Online (Sandbox Code Playgroud)

之后我cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null))))null. 当列数增加时,此代码的性能会变差。另外,我有字符串 columns 的名称val stringColumns = Array("p1","p3")。我想获得以下最终数据框:

+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x| a1 |    [b1]|null|
|  y|null|[b2, b3]| c1 |
+---+----+--------+----+
Run Code Online (Sandbox Code Playgroud)

为了实现最终的数据框,有没有更好的解决方案来解决这个问题?

the*_*tom 0

如果您查看https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015,那么您会发现带有foldLeft的withColumn存在已知的性能问题。Select 是一种替代方案,如下所示 - 使用可变参数。

不相信collect_list是一个问题。我也保留了第一组逻辑。枢轴启动作业以获得枢轴的不同值。在我看来,这是一种可以接受的方法。尝试自己推出对我来说似乎毫无意义,但其他答案可能证明我错了或者 Spark 2.4 已经得到了改进。

import spark.implicits._ 
import org.apache.spark.sql.functions._

// Your code & assumig id is only col of interest as in THIS question. More elegant than 1st posting.
val df = Seq( ("x","p1","a1"), ("x","p2","b1"), ("y","p2","b2"), ("y","p2","b3"), ("y","p3","c1")).toDF("id", "col1", "col2")
val aggDF = df.groupBy("id").pivot("col1").agg(collect_list("col2")) 
//aggDF.show(false)

val colsToSelect = aggDF.columns  // All in this case, 1st col id handled by head & tail

val aggDF2 = aggDF.select((col(colsToSelect.head) +: colsToSelect.tail.map
    (col => when(size(aggDF(col)) === 0,lit(null)).otherwise(aggDF(col)).as(s"$col"))):_*)
aggDF2.show(false)
Run Code Online (Sandbox Code Playgroud)

返回:

+---+----+--------+----+
|id |p1  |p2      |p3  |
+---+----+--------+----+
|x  |[a1]|[b1]    |null|
|y  |null|[b2, b3]|[c1]|
+---+----+--------+----+
Run Code Online (Sandbox Code Playgroud)

顺便说一句,也很好读:https://lansalo.com/2018/05/13/spark-how-to-add-multiple-columns-in-dataframes-and-how-not-to/。列数越多,效果越明显。最后,一位读者提出了一个相关的观点。

我认为当列数较多时,选择方法的性能会更好。

UPD:在假期期间,我使用 Spark 2.4.x 尝试了这两种方法,在最多 1000 列的情况下几乎没有观察到差异。这让我很困惑。