从Pyspark中的RDD中提取字典

Rom*_*dgz 6 python apache-spark pyspark

这是一个功课问题:

我有RDD一个集合os元组.我还有从每个输入元组返回字典的函数.不知何故,与减少功能相反.

通过地图,我可以很容易地从一个RDD元组到一个RDD字典.但是,由于字典是(键,值)对的集合,我想将RDD字典转换为RDD每个字典内容的(键,值)元组.

这样,如果我RDD包含10个元组,那么我得到一个RDD包含5个元素的10个字典(例如),最后我得到RDD50个元组.

我认为这必须是可能的但是,怎么样?(也许问题是我不知道这个操作是如何用英语调用的)

小智 14

我的2美分:

有一个名为"collectAsMap"的PairRDD函数,它从RDD返回一个字典.

让我举个例子:

sample = someRDD.sample(0, 0.0001, 0)
sample_dict = sample.collectAsMap()
print sample.collect()
print sample_dict

[('hi', 4123.0)]
{'hi': 4123.0}
Run Code Online (Sandbox Code Playgroud)

文档在这里

希望能帮助到你!问候!


zer*_*323 6

我想你想要的只是一个flatMap:

dicts = sc.parallelize([{"foo": 1, "bar": 2}, {"foo": 3, "baz": -1, "bar": 5}])
dicts.flatMap(lambda x: x.items())
Run Code Online (Sandbox Code Playgroud)

flatMap从RDD元素获取函数到iterable,然后连接结果.Spark上下文之外的相同类型操作的另一个名称是mapcat:

>>> from toolz.curried import map, mapcat, concat, pipe
>>> from itertools import repeat
>>> pipe(range(4), mapcat(lambda i: repeat(i, i + 1)), list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]
Run Code Online (Sandbox Code Playgroud)

或者一步一步走:

>>> pipe(range(4), map(lambda i: repeat(i, i + 1)), concat, list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]
Run Code Online (Sandbox Code Playgroud)

使用同样的事情 itertools.chain

>>> from itertools import chain
>>> pipe((repeat(i, i + 1) for i in  range(4)), chain.from_iterable, list)
>>> [0, 1, 1, 2, 2, 2, 3, 3, 3, 3]
Run Code Online (Sandbox Code Playgroud)