pyspark:“ PipelinedRDD”对象不可迭代

dee*_*mvp 5 rdd pyspark

我收到此错误,但我不知道为什么。基本上我是从此代码错误:

    a = data.mapPartitions(helper(locations))
Run Code Online (Sandbox Code Playgroud)

其中数据是RDD,而我的助手定义为:

    def helper(iterator, locations): 
        for x in iterator:
            c = locations[x]
            yield c
Run Code Online (Sandbox Code Playgroud)

(位置只是数据点的数组)我看不出问题出在哪里,但我也不是pyspark的佼佼者,所以有人可以告诉我为什么我无法从此代码中获得“ PipelinedRDD”对象吗?

Ara*_*mar 5

RDD 可以使用 map 和 lambda 函数进行迭代。我使用以下方法遍历了流水线 RDD

lines1 = sc.textFile("\..\file1.csv")
lines2 = sc.textFile("\..\file2.csv")

pairs1 = lines1.map(lambda s: (int(s), 'file1'))
pairs2 = lines2.map(lambda s: (int(s), 'file2'))

pair_result = pairs1.union(pairs2)

pair_result.reduceByKey(lambda a, b: a + ','+ b)

result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))
result_ll = [list(elem) for elem in result]
Run Code Online (Sandbox Code Playgroud)

===> result_ll = [list(elem) for elem in result]

类型错误:“PipelinedRDD”对象不可迭代

相反,我使用 map 函数替换了迭代

result_ll = result.map( lambda elem: list(elem))
Run Code Online (Sandbox Code Playgroud)

希望这有助于相应地修改您的代码