Dan*_*edu 3 python hdfs apache-spark pyspark
我正在尝试使用 apache spark (pyspark) 对值 (key, value) 进行分组。我设法按键进行分组,但在内部我想对值进行分组,如下例所示。
我需要按 cout() 对 GYEAR 列进行分组。
%pyspark
rdd1 = sc.textFile("/datos/apat63_99.txt")
rdd2 = rdd1.map(lambda line : line.split(",") ).map(lambda l : (l[4],l[1],l[0]))
for line in rdd2.take(6):
print(line)
######################
rdd3 = rdd2.map(lambda line:(line[0],(line[1:]) ))
rddx = rdd2.groupByKey()
rddx.take(5)
Run Code Online (Sandbox Code Playgroud)
我希望输出是:
在:
(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
(u'"BE"', u'1963', u'3070801')
(u'"BE"', u'1964', u'3070811')
(u'"US"', u'1963', u'3070802')
(u'"US"', u'1963', u'3070803')
(u'"US"', u'1963', u'3070804')
(u'"US"', u'1963', u'3070805')
(u'"US"', u'1964', u'3070807')
Run Code Online (Sandbox Code Playgroud)
出去:
(u'"BE"', [(u'1963', 1), (u'1964', 1)])
(u'"US"', [(u'1963', 4), (u'1964', 2)])
Run Code Online (Sandbox Code Playgroud)
这是 RDD 方法的一种方式:
from operator import add
# initialize the RDD
rdd = sc.parallelize([(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
, (u'"BE"', u'1963', u'3070801')
, (u'"BE"', u'1964', u'3070811')
, (u'"US"', u'1963', u'3070802')
, (u'"US"', u'1963', u'3070803')
, (u'"US"', u'1963', u'3070804')
, (u'"US"', u'1963', u'3070805')
, (u'"US"', u'1964', u'3070807')])
Run Code Online (Sandbox Code Playgroud)
请执行下列操作:
(COUNTRY, GYEAR)
作为键,1
作为值COUNTRY
,值到[(GYEAR, cnt)]
其中CNT从先前计算reduceByKeyreduceByKey(add)
以合并具有相同键 ( COUNTRY
)的列表。使用过滤器删除标题
rdd_new = rdd.map(lambda x: ((x[0],x[1]), 1) ) \
.reduceByKey(add) \
.map(lambda x: (x[0][0], [(x[0][1],x[1])])) \
.reduceByKey(add) \
.filter(lambda x: x[0] != '"COUNTRY"')
Run Code Online (Sandbox Code Playgroud)检查结果:
>>> rdd_new.take(2)
[(u'"US"', [(u'1964', 1), (u'1963', 4)]),
(u'"BE"', [(u'1963', 1), (u'1964', 1)])]
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5463 次 |
最近记录: |