Nak*_*euh 1 scala apache-spark apache-spark-sql
我有一个包含2列tag和的数据框value。
我想补充一点,包含新列max的value列。(对于每行它将是相同的值)。
我尝试执行以下操作,但是没有成功。
val df2 = df.withColumn("max",max($"value"))
Run Code Online (Sandbox Code Playgroud)
如何将max列添加到数据集?
有3种方法可以做到(一种已经从另一种答案中知道)。我避免了,collect因为它不是真正需要的。
这是最大值3出现两次的数据集。
val tags = Seq(
("tg1", 1), ("tg2", 2), ("tg1", 3), ("tg4", 4), ("tg3", 3)
).toDF("tag", "value")
scala> tags.show
+---+-----+
|tag|value|
+---+-----+
|tg1| 1|
|tg2| 2|
|tg1| 3| <-- maximum value
|tg4| 4|
|tg3| 3| <-- another maximum value
+---+-----+
Run Code Online (Sandbox Code Playgroud)
我将使用的笛卡尔联接tags和具有最大值的单行数据集。
val maxDF = tags.select(max("value") as "max")
scala> maxDF.show
+---+
|max|
+---+
| 4|
+---+
val solution = tags.crossJoin(maxDF)
scala> solution.show
+---+-----+---+
|tag|value|max|
+---+-----+---+
|tg1| 1| 4|
|tg2| 2| 4|
|tg1| 3| 4|
|tg4| 4| 4|
|tg3| 3| 4|
+---+-----+---+
Run Code Online (Sandbox Code Playgroud)
我不担心这里的笛卡尔联接,因为它只是一个单行数据集。
我最喜欢的窗口聚合非常适合此问题。另一方面,由于正在使用的分区数量(即只有1个),我并不真正认为这是最有效的方法,这可能会带来最坏的并行性。
诀窍是max在空窗口规范上使用聚合函数,该规范会通知Spark SQL以任何顺序使用所有行。
val solution = tags.withColumn("max", max("value") over ())
scala> solution.show
18/05/31 21:59:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-----+---+
|tag|value|max|
+---+-----+---+
|tg1| 1| 4|
|tg2| 2| 4|
|tg1| 3| 4|
|tg4| 4| 4|
|tg3| 3| 4|
+---+-----+---+
Run Code Online (Sandbox Code Playgroud)
请注意所有警告。
WindowExec:没有为窗口操作定义分区!将所有数据移动到单个分区,这可能会导致严重的性能下降。
鉴于其他解决方案,我不会使用这种方法,而将其留在这里用于教育目的。
| 归档时间: |
|
| 查看次数: |
1395 次 |
| 最近记录: |