Spark - 按键分组,然后按值计数

Bri*_*ian 3 scala mapreduce key-value apache-spark rdd

我有一些非唯一的键值对,我使用的map函数创建了RDD Array[String]

val kvPairs = myRdd.map(line => (line(0), line(1)))
Run Code Online (Sandbox Code Playgroud)

这会生成以下格式的数据:

1, A
1, A
1, B
2, C
Run Code Online (Sandbox Code Playgroud)

我想按照它们的值对所有键进行分组,并提供这些值的计数,如下所示:

1, {(A, 2), (B, 1)}
2, {(C, 1)}
Run Code Online (Sandbox Code Playgroud)

我尝试了很多不同的尝试,但我能得到的最接近的是这样的:

kvPairs.sortByKey().countByValue()
Run Code Online (Sandbox Code Playgroud)

这给了

1, (A, 2)
1, (B, 1)
2, (C, 1)
Run Code Online (Sandbox Code Playgroud)

也,

kvPairs.groupByKey().sortByKey()
Run Code Online (Sandbox Code Playgroud)

提供价值,但它仍然不是那里:

1, {(A, A, B)}
2, {(C)}
Run Code Online (Sandbox Code Playgroud)

我尝试将两者结合在一起:

kvPairs.countByValue().groupByKey().sortByKey()
Run Code Online (Sandbox Code Playgroud)

但这会返回错误

错误:value groupByKey不是scala.collection.Map的成员[(String,String),Long]

zer*_*323 8

只需直接计算对,然后分组(如果必须):

kvPairs.map((_, 1L))
  .reduceByKey(_ + _)
  .map{ case ((k, v), cnt) => (k, (v, cnt)) }
  .groupByKey
Run Code Online (Sandbox Code Playgroud)

如果你想gropuByKey在减少后想要使用自定义分区器,它只考虑密钥的第一个元素.您可以检查RDD拆分并在新RDD上进行聚合以获得示例实现.