如何添加具有最大值的新列?

Nak*_*euh 1 scala apache-spark apache-spark-sql

我有一个包含2列tag和的数据框value

我想补充一点,包含新列maxvalue列。(对于每行它将是相同的值)。

我尝试执行以下操作,但是没有成功。

val df2 = df.withColumn("max",max($"value"))
Run Code Online (Sandbox Code Playgroud)

如何将max列添加到数据集?

Jac*_*ski 5

有3种方法可以做到(一种已经从另一种答案中知道)。我避免了,collect因为它不是真正需要的。

这是最大值3出现两次的数据集。

val tags = Seq(
  ("tg1", 1), ("tg2", 2), ("tg1", 3), ("tg4", 4), ("tg3", 3)
).toDF("tag", "value")
scala> tags.show
+---+-----+
|tag|value|
+---+-----+
|tg1|    1|
|tg2|    2|
|tg1|    3| <-- maximum value
|tg4|    4|
|tg3|    3| <-- another maximum value
+---+-----+
Run Code Online (Sandbox Code Playgroud)

笛卡尔联接与“最大”数据集

我将使用的笛卡尔联接tags和具有最大值的单行数据集。

val maxDF = tags.select(max("value") as "max")
scala> maxDF.show
+---+
|max|
+---+
|  4|
+---+
val solution = tags.crossJoin(maxDF)
scala> solution.show
+---+-----+---+
|tag|value|max|
+---+-----+---+
|tg1|    1|  4|
|tg2|    2|  4|
|tg1|    3|  4|
|tg4|    4|  4|
|tg3|    3|  4|
+---+-----+---+
Run Code Online (Sandbox Code Playgroud)

我不担心这里的笛卡尔联接,因为它只是一个单行数据集。

窗口聚合

我最喜欢的窗口聚合非常适合此问题。另一方面,由于正在使用的分区数量(即只有1个),我并不真正认为这是最有效的方法,这可能会带来最坏的并行性。

诀窍是max在空窗口规范上使用聚合函数,该规范会通知Spark SQL以任何顺序使用所有行。

val solution = tags.withColumn("max", max("value") over ())
scala> solution.show
18/05/31 21:59:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-----+---+
|tag|value|max|
+---+-----+---+
|tg1|    1|  4|
|tg2|    2|  4|
|tg1|    3|  4|
|tg4|    4|  4|
|tg3|    3|  4|
+---+-----+---+
Run Code Online (Sandbox Code Playgroud)

请注意所有警告。

WindowExec:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。

鉴于其他解决方案,我不会使用这种方法,而将其留在这里用于教育目的。