Mpi*_*ris 1 accumulator apache-spark pyspark
假设我有以下RDD:
test1 = (('trial1',[1,2]),('trial2',[3,4]))
test1RDD = sc.parallelize(test1)
Run Code Online (Sandbox Code Playgroud)
如何创建以下rdd:
((1,'trial1',[1,2]),(2,'trial2',[3,4]))
Run Code Online (Sandbox Code Playgroud)
我尝试使用累加器但它不起作用,因为累加器无法在任务中访问:
def increm(keyvalue):
global acc
acc +=1
return (acc.value,keyvalue[0],keyvalue[1])
acc = sc.accumulator(0)
test1RDD.map(lambda x: increm(x)).collect()
Run Code Online (Sandbox Code Playgroud)
知道如何做到这一点?
你可以使用zipWithIndex
zipWithIndex()
用它的元素索引来拉开这个RDD.
排序首先基于分区索引,然后是每个分区内的项目顺序.因此,第一个分区中的第一个项目获得索引0,最后一个分区中的最后一个项目获得最大索引.
当此RDD包含多个分区时,此方法需要触发spark作业.
>>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
Run Code Online (Sandbox Code Playgroud)
并用于map转换RDD以使索引位于新RDD之前
这是未经测试的,因为我没有任何环境:
test1 = (('trial1',[1,2]),('trial2',[3,4]))
test1RDD = sc.parallelize(test1)
test1RDD.zipWithIndex().map(lambda x : (x[1],x[0]))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1950 次 |
| 最近记录: |