如何将 groupBy().count() 添加到源 DataFrame?

2 scala apache-spark apache-spark-sql

我有以下数据框:

+---------------+--------------+--------------+-----+
|        column0|       column1|       column2|label|
+---------------+--------------+--------------+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|
+---------------+--------------+--------------+-----+
Run Code Online (Sandbox Code Playgroud)

我想应用 groupBy 并依靠它并得出以下结果:

+--------------+--------------+-----+
|       column1|       column2|count|
+--------------+--------------+-----+
|10.0.0.2.54880| 10.0.0.3.5001|   19|
| 10.0.0.3.5001|10.0.0.2.54880|   10|
+--------------+--------------+-----+
Run Code Online (Sandbox Code Playgroud)

我知道我必须使用这个:

dataFrame_Train.groupBy("column1", "column2").count().show()
Run Code Online (Sandbox Code Playgroud)

但是问题是我需要将“计数”列作为永久列添​​加到我的数据框中。 在上述情况下,如果我dataFrame_Train.show()在 之后使用groupBy,我会看到没有“计数”列的第一个数据帧。这段代码:

dataFrame_Train.groupBy("column1", "column2").count().show()
`dataFrame_Train.show()`
Run Code Online (Sandbox Code Playgroud)

你能帮我添加groupBy("column1", "column2").count()到数据框吗?(因为我将来需要使用“计数”列来训练数据)谢谢。

Jac*_*ski 6

@eliasah 的回答很好,但可能不是最有效的、代码和性能方面的。

窗口聚合函数(又名窗口聚合)

每当您发现需要groupByand 时join,尤其是。对于像这样的简单用例,请考虑窗口聚合函数

之间的主要区别groupBy和窗口聚集之处在于前者给你最多的行数在源数据集,而后者(窗口集合体)为您提供准确的行数与源数据集。这似乎完全符合您的要求,不是吗?

有了这个,让我们看看代码。

import org.apache.spark.sql.expressions.Window
val columns1and2 = Window.partitionBy("column1", "column2") // <-- matches groupBy

import org.apache.spark.sql.functions._
// using count aggregate function over entire partition frame
val counts = ips.withColumn("count", count($"label") over columns1and2)
scala> counts.show
+---------------+--------------+--------------+-----+-----+
|        column0|       column1|       column2|label|count|
+---------------+--------------+--------------+-----+-----+
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604900|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604899|10.0.0.2.54880| 10.0.0.3.5001|    2|   13|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
|05:49:56.604908| 10.0.0.3.5001|10.0.0.2.54880|    2|    7|
+---------------+--------------+--------------+-----+-----+
Run Code Online (Sandbox Code Playgroud)

完毕!干净又方便。这就是我心爱的窗口聚合函数!

性能比较

有趣的来了。这和@eliasah 的解决方案之间的区别只是纯粹的语法吗?我不这么认为(但我仍在学习如何得出正确的结论)。查看执行计划并自行判断。

下面是窗口聚合的执行计划。

在此处输入图片说明

然而,以下是执行计划groupByjoin(我不得不把两个截图的计划是太大的一个包括)。

在此处输入图片说明 在此处输入图片说明

Job-wisegroupByjoinquery 轻松击败了窗口聚合,前者有 2 个 Spark 作业,后者有 5 个。

操作员方面,它们的数量和最重要的交换(它们是 Spark SQL 的 shuffle),窗口聚合可能已经击败groupByjoin.