在pySpark中从dict构建一行

Jef*_*eff 23 python apache-spark pyspark

我正在尝试在pySpark 1.6.1中动态构建一行,然后将其构建为数据帧.一般的想法是将结果扩展describe到包括例如偏斜和峰度.这是我认为应该工作的:

from pyspark.sql import Row

row_dict = {'C0': -1.1990072635132698,
            'C3': 0.12605772684660232,
            'C4': 0.5760856026559944,
            'C5': 0.1951877800894315,
            'C6': 24.72378589441825,
            'summary': 'kurtosis'}

new_row = Row(row_dict)
Run Code Online (Sandbox Code Playgroud)

但这会返回TypeError: sequence item 0: expected string, dict found一个相当明显的错误.然后我发现如果我先定义Row字段,我可以使用dict:

r = Row('summary', 'C0', 'C3', 'C4', 'C5', 'C6')
r(row_dict)
> Row(summary={'summary': 'kurtosis', 'C3': 0.12605772684660232, 'C0': -1.1990072635132698, 'C6': 24.72378589441825, 'C5': 0.1951877800894315, 'C4': 0.5760856026559944})
Run Code Online (Sandbox Code Playgroud)

这将是一个很好的步骤,除了它似乎我不能动态指定字段Row.我需要这个来处理未知名称的未知行数.根据文档,您实际上可以采用另一种方式:

>>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11}
True
Run Code Online (Sandbox Code Playgroud)

所以看起来我应该能够做到这一点.此外,似乎还有一些旧版本可能会弃用的功能,例如此处.我缺少一个更新的等价物吗?

zer*_*323 31

您可以使用关键字参数解压缩,如下所示:

Row(**row_dict)

## Row(C0=-1.1990072635132698, C3=0.12605772684660232, C4=0.5760856026559944, 
##     C5=0.1951877800894315, C6=24.72378589441825, summary='kurtosis')
Run Code Online (Sandbox Code Playgroud)

值得注意的是,它通过密钥内部对数据进行排序,以解决旧版Python的问题.


rya*_*yan 7

如果 dict 没有变平,您可以递归地将 dict 转换为 Row。

def as_row(obj):
    if isinstance(obj, dict):
        dictionary = {k: as_row(v) for k, v in obj.items()}
        return Row(**dictionary)
    elif isinstance(obj, list):
        return [as_row(v) for v in obj]
    else:
        return obj
Run Code Online (Sandbox Code Playgroud)