Tra*_*isJ 42 python mapreduce apache-spark rdd pyspark
我正在编写一个Spark应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个Key-Multivalue对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:
My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
Run Code Online (Sandbox Code Playgroud)
发生这种情况时我得到的错误是:
'NoneType'对象没有attribue'追加'.
我的键是整数,值V1,...,Vn是元组.我的目标是使用密钥和值列表(元组)创建一对.
Chr*_*fer 50
Map和ReduceByKey
输入类型和输出类型reduce必须相同,因此如果要聚合列表,则必须map输入列表.然后将列表合并到一个列表中.
结合清单
您需要一种方法将列表组合到一个列表中.Phyton提供了一些组合列表的方法.
append修改第一个列表并始终返回None.
x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]
Run Code Online (Sandbox Code Playgroud)
extend 做同样的,但打开列表:
x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]
Run Code Online (Sandbox Code Playgroud)
两种方法都返回None,但是您需要一个返回组合列表的方法,因此只需使用加号.
x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]
Run Code Online (Sandbox Code Playgroud)
火花
file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda actor: (actor.split(",")[0], actor)) \
# transform each value into a list
.map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \
# combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
.reduceByKey(lambda a, b: a + b)
Run Code Online (Sandbox Code Playgroud)
CombineByKey
也可以通过combineByKey内部使用来解决这个问题reduceByKey,但它更复杂,并且"使用Spark中的一个专用的每键组合器可以更快".您的用例对于上层解决方案来说非常简单.
GroupByKey
也可以解决这个问题groupByKey,但它减少了并行化,因此对于大数据集来说可能要慢得多.
alr*_*ich 14
我对谈话有点迟,但这是我的建议:
>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
Run Code Online (Sandbox Code Playgroud)
zer*_*323 14
tl; dr如果你真的需要这样的操作,请按照@MariusIon的groupByKey 建议使用.与直接分组相比,这里提出的每个其他解决方案要么直率低效,至少是次优的.
reduceByKey 列表连接不是一个可接受的解决方案,因为:
+对一对列表的每个应用都要求两个列表(O(N))的完整副本有效地将总体复杂度增加到O(N 2).groupByKey.必须改组的数据量以及最终结构的大小是相同的.reduceByKey和实现之间的并行度水平没有区别groupByKey.combineByKeywith list.extend是一个次优解决方案,因为:
MergeValue(可以通过list.append直接在新项目上使用来优化).list.append它进行优化,则完全等同于a的旧(Spark <= 1.3)实现,groupByKey并忽略SPARK-3074引入的所有优化,这些优化实现了大于内存结构的外部(磁盘上)分组.Mar*_*Ion 11
您可以使用RDD groupByKey方法.
输入:
data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()
Run Code Online (Sandbox Code Playgroud)
输出:
[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
81753 次 |
| 最近记录: |