如何将RDD [(Key,Value)]转换为Map [Key,RDD [Value]]

Seb*_*Seb 6 scala bigdata apache-spark rdd

我搜索了很长一段时间的解决方案,但没有得到任何正确的算法.

在scala中使用Spark RDD,如何将a RDD[(Key, Value)]转换为a Map[key, RDD[Value]],知道我不能使用collect或其他可能将数据加载到内存中的方法?

实际上,我的最终目标是Map[Key, RDD[Value]]按键循环并saveAsNewAPIHadoopFile为每个调用RDD[Value]

例如,如果我得到:

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]
Run Code Online (Sandbox Code Playgroud)

我想要 :

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]
Run Code Online (Sandbox Code Playgroud)

我想知道filter在每个键A,B,C上使用它是否花费不太多RDD[(Key, Value)],但是我不知道是否多次调用过滤器有不同的键会有效吗?(当然不是,但可能使用cache?)

谢谢

0x0*_*FFF 1

您应该使用这样的代码(Python):

rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
    out.saveAsNewAPIHadoopFile (...)
Run Code Online (Sandbox Code Playgroud)

一个 RDD 不能成为另一个 RDD 的一部分,并且您无法选择仅收集键并将其相关值转换为单独的 RDD。在我的示例中,您将迭代缓存的 RDD,这是可以的并且可以快速工作