使用Apache Spark将键值对减少为键列表对

Tra*_*isJ 42 python mapreduce apache-spark rdd pyspark

我正在编写一个Spark应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个Key-Multivalue对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))
Run Code Online (Sandbox Code Playgroud)

发生这种情况时我得到的错误是:

'NoneType'对象没有attribue'追加'.

我的键是整数,值V1,...,Vn是元组.我的目标是使用密钥和值列表(元组)创建一对.

Chr*_*fer 50

Map和ReduceByKey

输入类型和输出类型reduce必须相同,因此如果要聚合列表,则必须map输入列表.然后将列表合并到一个列表中.

结合清单

您需要一种方法将列表组合到一个列表中.Phyton提供了一些组合列表的方法.

append修改第一个列表并始终返回None.

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]
Run Code Online (Sandbox Code Playgroud)

extend 做同样的,但打开列表:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]
Run Code Online (Sandbox Code Playgroud)

两种方法都返回None,但是您需要一个返回组合列表的方法,因此只需使用加号.

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]
Run Code Online (Sandbox Code Playgroud)

火花

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
         .map(lambda actor: (actor.split(",")[0], actor)) \ 

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)
Run Code Online (Sandbox Code Playgroud)

CombineByKey

也可以通过combineByKey内部使用来解决这个问题reduceByKey,但它更复杂,并且"使用Spark中的一个专用的每键组合器可以更快".您的用例对于上层解决方案来说非常简单.

GroupByKey

也可以解决这个问题groupByKey,但它减少了并行化,因此对于大数据集来说可能要慢得多.


alr*_*ich 14

我对谈话有点迟,但这是我的建议:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
Run Code Online (Sandbox Code Playgroud)


zer*_*323 14

tl; dr如果你真的需要这样的操作,请按照@MariusIon的groupByKey 建议使用.与直接分组相比,这里提出的每个其他解决方案要么直率低效,至少是次优的.

reduceByKey 列表连接不是一个可接受的解决方案,因为:

  • 需要初始化O(N)列表.
  • +对一对列表的每个应用都要求两个列表(O(N))的完整副本有效地将总体复杂度增加到O(N 2).
  • 没有解决由此引入的任何问题groupByKey.必须改组的数据量以及最终结构的大小是相同的.
  • 其中一个答案所建议的不同,使用reduceByKey和实现之间的并行度水平没有区别groupByKey.

combineByKeywith list.extend是一个次优解决方案,因为:

  • 创建O(N)列表对象MergeValue(可以通过list.append直接在新项目上使用来优化).
  • 如果使用list.append它进行优化,则完全等同于a的旧(Spark <= 1.3)实现,groupByKey并忽略SPARK-3074引入的所有优化,这些优化实现了大于内存结构的外部(磁盘上)分组.


Mar*_*Ion 11

您可以使用RDD groupByKey方法.

输入:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()
Run Code Online (Sandbox Code Playgroud)

输出:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
Run Code Online (Sandbox Code Playgroud)

  • 不鼓励使用`groupByKey`,因为它会导致过度的混乱.您应该使用`reduceByKey`([请参阅此链接](http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html))或`combineByKey`,如@ Christian_Strempfer (11认同)