PySpark reduceByKey?添加键/元组

the*_*ing 8 python apache-spark pyspark

我有以下数据,我想做的是

[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]
Run Code Online (Sandbox Code Playgroud)

是每个键计数值的实例(1个字符串字符)。所以我首先做了一张地图:

.map(lambda x: (x[0], [x[1], 1]))
Run Code Online (Sandbox Code Playgroud)

现在使其成为以下项的键/元组:

[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', 1]), (13, ['A', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['A', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['X', 1])]
Run Code Online (Sandbox Code Playgroud)

我只是无法在最后一部分中弄清楚如何对每个键计数该字母的实例。例如,键13将具有1 D和1A。而键14将具有2 T,依此类推。

ohr*_*uus 6

我对Scala中的Spark更加熟悉,因此可能有比Counter计数由产生的可迭代字符的更好的方法groupByKey,但是这里有一个选择:

from collections import Counter

rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.groupByKey().mapValues(Counter).collect()

[(48, Counter({'0': 2})),
 (32, Counter({'6': 2})),
 (49, Counter({'2': 2})),
 (50, Counter({'0': 2})),
 (51, Counter({'X': 1, 'T': 1})),
 (53, Counter({'2': 1})),
 (13, Counter({'A': 1, 'D': 1})),
 (45, Counter({'A': 1, 'T': 1})),
 (14, Counter({'T': 2})),
 (54, Counter({'0': 1})),
 (47, Counter({'2': 2}))]
Run Code Online (Sandbox Code Playgroud)

  • 哦,您已经使用Counter了!不幸的是,应该避免使用“ groupByKey”,因为它会聚合所有主数据。并且2个操作而不是1个操作也不足够。但是1票赞成紧凑! (3认同)
  • http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html (3认同)

src*_*aha 5

代替:

.map(lambda x: (x[0], [x[1], 1]))
Run Code Online (Sandbox Code Playgroud)

我们可以这样做:

.map(lambda x: ((x[0], x[1]), 1))
Run Code Online (Sandbox Code Playgroud)

在最后一步,我们可以使用reduceByKeyadd。请注意, add 来自operator包。

把它放在一起:

from operator import add
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()
Run Code Online (Sandbox Code Playgroud)