按键值分组 pyspark

Dan*_*edu 3 python hdfs apache-spark pyspark

我正在尝试使用 apache spark (pyspark) 对值 (key, value) 进行分组。我设法按键进行分组,但在内部我想对值进行分组,如下例所示。

我需要按 cout() 对 GYEAR 列进行分组。

%pyspark

rdd1 = sc.textFile("/datos/apat63_99.txt")

rdd2 = rdd1.map(lambda line :  line.split(",") ).map(lambda l : (l[4],l[1],l[0]))

for line in rdd2.take(6):
    print(line)

######################

rdd3 = rdd2.map(lambda line:(line[0],(line[1:]) ))

rddx = rdd2.groupByKey()
rddx.take(5)

Run Code Online (Sandbox Code Playgroud)

我希望输出是:

在:

(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
(u'"BE"', u'1963', u'3070801')
(u'"BE"', u'1964', u'3070811')
(u'"US"', u'1963', u'3070802')
(u'"US"', u'1963', u'3070803')
(u'"US"', u'1963', u'3070804')
(u'"US"', u'1963', u'3070805')
(u'"US"', u'1964', u'3070807')
Run Code Online (Sandbox Code Playgroud)

出去:

(u'"BE"', [(u'1963', 1), (u'1964', 1)])
(u'"US"', [(u'1963', 4), (u'1964', 2)])
Run Code Online (Sandbox Code Playgroud)

jxc*_*jxc 5

这是 RDD 方法的一种方式:

from operator import add

# initialize the RDD
rdd = sc.parallelize([(u'"COUNTRY"', u'"GYEAR"', u'"PATENT"')
    , (u'"BE"', u'1963', u'3070801')
    , (u'"BE"', u'1964', u'3070811')
    , (u'"US"', u'1963', u'3070802')
    , (u'"US"', u'1963', u'3070803')
    , (u'"US"', u'1963', u'3070804')
    , (u'"US"', u'1963', u'3070805')
    , (u'"US"', u'1964', u'3070807')])
Run Code Online (Sandbox Code Playgroud)

请执行下列操作:

  1. 设置元组(COUNTRY, GYEAR)作为键,1作为值
  2. 使用 reduceByKey(add) 计算键
  3. 调整键COUNTRY,值到[(GYEAR, cnt)]其中CNT从先前计算reduceByKey
  4. 运行reduceByKey(add)以合并具有相同键 ( COUNTRY)的列表。
  5. 使用过滤器删除标题

    rdd_new = rdd.map(lambda x: ((x[0],x[1]), 1) ) \
                 .reduceByKey(add) \
                 .map(lambda x: (x[0][0], [(x[0][1],x[1])])) \
                 .reduceByKey(add) \
                 .filter(lambda x: x[0] != '"COUNTRY"')
    
    Run Code Online (Sandbox Code Playgroud)

检查结果:

>>> rdd_new.take(2)
[(u'"US"', [(u'1964', 1), (u'1963', 4)]),
 (u'"BE"', [(u'1963', 1), (u'1964', 1)])]
Run Code Online (Sandbox Code Playgroud)