在RDD中添加递增变量

Mpi*_*ris 1 accumulator apache-spark pyspark

假设我有以下RDD:

test1 = (('trial1',[1,2]),('trial2',[3,4]))
test1RDD = sc.parallelize(test1)
Run Code Online (Sandbox Code Playgroud)

如何创建以下rdd:

((1,'trial1',[1,2]),(2,'trial2',[3,4]))
Run Code Online (Sandbox Code Playgroud)

我尝试使用累加器但它不起作用,因为累加器无法在任务中访问:

def increm(keyvalue):
    global acc
    acc +=1
    return (acc.value,keyvalue[0],keyvalue[1])


acc = sc.accumulator(0)
test1RDD.map(lambda x: increm(x)).collect()
Run Code Online (Sandbox Code Playgroud)

知道如何做到这一点?

cch*_*son 7

你可以使用zipWithIndex

zipWithIndex()

用它的元素索引来拉开这个RDD.

排序首先基于分区索引,然后是每个分区内的项目顺序.因此,第一个分区中的第一个项目获得索引0,最后一个分区中的最后一个项目获得最大索引.

当此RDD包含多个分区时,此方法需要触发spark作业.

  >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
Run Code Online (Sandbox Code Playgroud)

并用于map转换RDD以使索引位于新RDD之前

这是未经测试的,因为我没有任何环境:

test1 = (('trial1',[1,2]),('trial2',[3,4]))
test1RDD = sc.parallelize(test1)
test1RDD.zipWithIndex().map(lambda x : (x[1],x[0]))
Run Code Online (Sandbox Code Playgroud)