use*_*030 7 python apache-spark rdd pyspark
我想按值分组,然后使用PySpark在每个组中找到最大值.我有以下代码,但现在我有点不知道如何提取最大值.
# some file contains tuples ('user', 'item', 'occurrences')
data_file = sc.textData('file:///some_file.txt')
# Create the triplet so I index stuff
data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2])))
# Group by the user i.e. r[0]
grouped = data_file.groupBy(lambda r: r[0])
# Here is where I am stuck
group_list = grouped.map(lambda x: (list(x[1]))) #?
Run Code Online (Sandbox Code Playgroud)
返回类似于:
[[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]]
Run Code Online (Sandbox Code Playgroud)
我想现在为每个用户找到最大'发生'.执行最大值后的最终结果将导致RDD看起来像这样:
[[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]]
Run Code Online (Sandbox Code Playgroud)
其中只有最大数据集将保留给文件中的每个用户.换句话说,我想将RDD 的值更改为仅包含每个用户最多出现的一个三元组.
zer*_*323 12
这里没有必要groupBy.简单reduceByKey会做得很好,大部分时间会更有效率:
data_file = sc.parallelize([
(u'u1', u's1', 20), (u'u1', u's2', 5),
(u'u2', u's3', 5), (u'u2', u's2', 10)])
max_by_group = (data_file
.map(lambda x: (x[0], x)) # Convert to PairwiseRD
# Take maximum of the passed arguments by the last element (key)
# equivalent to:
# lambda x, y: x if x[-1] > y[-1] else y
.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))
.values()) # Drop keys
max_by_group.collect()
## [('u2', 's2', 10), ('u1', 's1', 20)]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
13673 次 |
| 最近记录: |