Par*_*kar 2 python apache-spark apache-spark-sql pyspark
我有一个数据框,仅包含一列,其中包含 类型的元素MapType(StringType(), IntegerType())。我想获得该列的累积和,其中该sum操作意味着添加两个字典。
最小的例子
a = [{'Maps': ({'a': 1, 'b': 2, 'c': 3})}, {'Maps': ({'a': 2, 'b': 4, 'd': 6})}]
df = spark.createDataFrame(a)
df.show(5, False)
+---------------------------+
|Maps |
+---------------------------+
|Map(a -> 1, b -> 2, c -> 3)|
|Map(a -> 2, b -> 4, d -> 6)|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)
如果我要获得该列的累积和Maps,我应该得到以下结果。
+-----------------------------------+
|Maps |
+-----------------------------------+
|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
+-----------------------------------+
Run Code Online (Sandbox Code Playgroud)
PS我使用的是Python 2.6,所以collections.Counter不可用。如果绝对必要的话我可能可以安装它。
我的尝试:
我尝试过一种accumulator基于方法和一种使用fold.
累加器
def addDictFun(x):
global v
v += x
class DictAccumulatorParam(AccumulatorParam):
def zero(self, d):
return d
def addInPlace(self, d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1
v = sc.accumulator(MapType(StringType(), IntegerType()), DictAccumulatorParam())
cumsum_dict = df.rdd.foreach(addDictFun)
Run Code Online (Sandbox Code Playgroud)
现在最后,我应该将生成的字典保存在v. 相反,我得到的错误MapType是不可迭代的(主要是for k in d1在函数中的行上addInPlace)。
rdd 折叠
基于的方法rdd.fold如下:
def add_dicts(d1, d2):
for k in d1:
d1[k] = d1[k] + (d2[k] if k in d2 else 0)
for k in d2:
if k not in d1:
d1[k] = d2[k]
return d1
cumsum_dict = df.rdd.fold(MapType(StringType(), IntegerType()), add_dicts)
Run Code Online (Sandbox Code Playgroud)
但是,我在这里遇到同样的MapType is not iterable错误。知道我哪里出错了吗?
pyspark.sql.typesfold是模式描述符,而不是集合或外部语言表示,因此不能与或 一起使用Accumulator。
最直接的解决方案是explode聚合
from pyspark.sql.functions import explode
df = spark.createDataFrame(
[{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 4, 'd': 6}],
"map<string,integer>"
).toDF("Maps")
df.select(explode("Maps")).groupBy("key").sum("value").rdd.collectAsMap()
# {'d': 6, 'c': 3, 'b': 6, 'a': 3}
Run Code Online (Sandbox Code Playgroud)
与RDD你一起可以做类似的事情:
from operator import add
df.rdd.flatMap(lambda row: row.Maps.items()).reduceByKey(add).collectAsMap()
# {'b': 6, 'c': 3, 'a': 3, 'd': 6}
Run Code Online (Sandbox Code Playgroud)
或者如果你真的想要fold
from operator import attrgetter
from collections import defaultdict
def merge(acc, d):
for k in d:
acc[k] += d[k]
return acc
df.rdd.map(attrgetter("Maps")).fold(defaultdict(int), merge)
# defaultdict(int, {'a': 3, 'b': 6, 'c': 3, 'd': 6})
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1730 次 |
| 最近记录: |