包含字典的 pyspark 数据框列的总和

Par*_*kar 2 python apache-spark apache-spark-sql pyspark

我有一个数据框,仅包含一列,其中包含 类型的元素MapType(StringType(), IntegerType())。我想获得该列的累积和,其中该sum操作意味着添加两个字典。

最小的例子

a = [{'Maps': ({'a': 1, 'b': 2, 'c': 3})}, {'Maps': ({'a': 2, 'b': 4, 'd': 6})}]
df = spark.createDataFrame(a)
df.show(5, False)

+---------------------------+
|Maps                       |
+---------------------------+
|Map(a -> 1, b -> 2, c -> 3)|
|Map(a -> 2, b -> 4, d -> 6)|
+---------------------------+
Run Code Online (Sandbox Code Playgroud)

如果我要获得该列的累积和Maps,我应该得到以下结果。

+-----------------------------------+
|Maps                               |
+-----------------------------------+
|Map(a -> 3, b -> 6, c -> 3, d -> 6)|
+-----------------------------------+
Run Code Online (Sandbox Code Playgroud)

PS我使用的是Python 2.6,所以collections.Counter不可用。如果绝对必要的话我可能可以安装它。

我的尝试:

我尝试过一种accumulator基于方法和一种使用fold.

累加器

def addDictFun(x):
    global v
    v += x

class DictAccumulatorParam(AccumulatorParam):
    def zero(self, d):
        return d
    def addInPlace(self, d1, d2):
        for k in d1:
            d1[k] = d1[k] + (d2[k] if k in d2 else 0)
        for k in d2:
            if k not in d1:
                d1[k] = d2[k]
        return d1

v = sc.accumulator(MapType(StringType(), IntegerType()), DictAccumulatorParam())
cumsum_dict = df.rdd.foreach(addDictFun)
Run Code Online (Sandbox Code Playgroud)

现在最后,我应该将生成的字典保存在v. 相反,我得到的错误MapType是不可迭代的(主要是for k in d1在函数中的行上addInPlace)。

rdd 折叠

基于的方法rdd.fold如下:

def add_dicts(d1, d2):
    for k in d1:
        d1[k] = d1[k] + (d2[k] if k in d2 else 0)
    for k in d2:
        if k not in d1:
            d1[k] = d2[k]
    return d1

cumsum_dict = df.rdd.fold(MapType(StringType(), IntegerType()), add_dicts)
Run Code Online (Sandbox Code Playgroud)

但是,我在这里遇到同样的MapType is not iterable错误。知道我哪里出错了吗?

hi-*_*zir 5

pyspark.sql.typesfold是模式描述符,而不是集合或外部语言表示,因此不能与或 一起使用Accumulator

最直接的解决方案是explode聚合

from pyspark.sql.functions import explode

df = spark.createDataFrame(
    [{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 4, 'd': 6}], 
    "map<string,integer>"
).toDF("Maps")

df.select(explode("Maps")).groupBy("key").sum("value").rdd.collectAsMap()
# {'d': 6, 'c': 3, 'b': 6, 'a': 3}  
Run Code Online (Sandbox Code Playgroud)

RDD你一起可以做类似的事情:

from operator import add

df.rdd.flatMap(lambda row: row.Maps.items()).reduceByKey(add).collectAsMap()
# {'b': 6, 'c': 3, 'a': 3, 'd': 6}
Run Code Online (Sandbox Code Playgroud)

或者如果你真的想要fold

from operator import attrgetter
from collections import defaultdict

def merge(acc, d):
    for k in d:
        acc[k] += d[k]
    return acc

df.rdd.map(attrgetter("Maps")).fold(defaultdict(int), merge)
# defaultdict(int, {'a': 3, 'b': 6, 'c': 3, 'd': 6})
Run Code Online (Sandbox Code Playgroud)