Seb*_*Seb 6 scala bigdata apache-spark rdd
我搜索了很长一段时间的解决方案,但没有得到任何正确的算法.
在scala中使用Spark RDD,如何将a RDD[(Key, Value)]转换为a Map[key, RDD[Value]],知道我不能使用collect或其他可能将数据加载到内存中的方法?
实际上,我的最终目标是Map[Key, RDD[Value]]按键循环并saveAsNewAPIHadoopFile为每个调用RDD[Value]
例如,如果我得到:
RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]
Run Code Online (Sandbox Code Playgroud)
我想要 :
Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]
Run Code Online (Sandbox Code Playgroud)
我想知道filter在每个键A,B,C上使用它是否花费不太多RDD[(Key, Value)],但是我不知道是否多次调用过滤器有不同的键会有效吗?(当然不是,但可能使用cache?)
谢谢
您应该使用这样的代码(Python):
rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
out.saveAsNewAPIHadoopFile (...)
Run Code Online (Sandbox Code Playgroud)
一个 RDD 不能成为另一个 RDD 的一部分,并且您无法选择仅收集键并将其相关值转换为单独的 RDD。在我的示例中,您将迭代缓存的 RDD,这是可以的并且可以快速工作