我收到此错误,但我不知道为什么。基本上我是从此代码错误:
a = data.mapPartitions(helper(locations))
Run Code Online (Sandbox Code Playgroud)
其中数据是RDD,而我的助手定义为:
def helper(iterator, locations):
for x in iterator:
c = locations[x]
yield c
Run Code Online (Sandbox Code Playgroud)
(位置只是数据点的数组)我看不出问题出在哪里,但我也不是pyspark的佼佼者,所以有人可以告诉我为什么我无法从此代码中获得“ PipelinedRDD”对象吗?
RDD 可以使用 map 和 lambda 函数进行迭代。我使用以下方法遍历了流水线 RDD
lines1 = sc.textFile("\..\file1.csv")
lines2 = sc.textFile("\..\file2.csv")
pairs1 = lines1.map(lambda s: (int(s), 'file1'))
pairs2 = lines2.map(lambda s: (int(s), 'file2'))
pair_result = pairs1.union(pairs2)
pair_result.reduceByKey(lambda a, b: a + ','+ b)
result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))
result_ll = [list(elem) for elem in result]
Run Code Online (Sandbox Code Playgroud)
===> result_ll = [list(elem) for elem in result]
类型错误:“PipelinedRDD”对象不可迭代
相反,我使用 map 函数替换了迭代
result_ll = result.map( lambda elem: list(elem))
Run Code Online (Sandbox Code Playgroud)
希望这有助于相应地修改您的代码
归档时间: |
|
查看次数: |
16526 次 |
最近记录: |