在spark中修改了countByKey

Hi *_*llo 2 scala apache-spark apache-spark-sql

我有一个数据框,如下所示:

+------+-------+
| key  | label |
+------+-------+
| key1 | a     |
| key1 | b     |
| key2 | a     |
| key2 | a     |
| key2 | a     |
+------+-------+
Run Code Online (Sandbox Code Playgroud)

我想要在spark中更改countByKeys的版本,该版本返回如下输出:

+------+-------+
| key  | count |
+------+-------+
| key1 |     0 |
| key2 |     3 |
+------+-------+
//explanation: 
if all labels under a key are same, then return count of all rows under a key 
else count for that key is 0
Run Code Online (Sandbox Code Playgroud)

我解决这个问题的方法:

脚步:

  1. reduceByKey() :连接所有标签(将标签视为字符串)以获取类型的数据框 < key,concat_of_all_labels >
  2. mapValues():按字符解析每个字符串以检查是否都相同。如果它们的返回标签数相同,则返回0。

我是新手,我觉得应该有一些有效的方法来完成这项工作。有没有更好的方法来完成此任务?

Kom*_*owy 6

这非常简单:通过键同时获得计数和非重复计数,然后...然后...

val df = Seq(("key1", "a"), ("key1", "b"), ("key2", "a"), ("key2", "a"), ("key2", "a")).toDF("key", "label")
df.groupBy('key)
  .agg(countDistinct('label).as("cntDistinct"), count('label).as("cnt"))
  .select('key, when('cntDistinct === 1, 'cnt).otherwise(typedLit(0)).as("count"))
  .show

+----+-----+
| key|count|
+----+-----+
|key1|    0|
|key2|    3|
+----+-----+
Run Code Online (Sandbox Code Playgroud)