将pyspark.sql.dataframe.DataFrame类型转换为Dictionary

Har*_*pta 6 python dictionary apache-spark pyspark

我有一个pyspark Dataframe,我需要将其转换为python字典.

下面的代码是可重现的:

from pyspark.sql import Row
rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)])
df = rdd.toDF()
Run Code Online (Sandbox Code Playgroud)

一旦我有了这个数据帧,我需要将它转换为字典.

我试过这样的

df.set_index('name').to_dict()
Run Code Online (Sandbox Code Playgroud)

但它给出了错误.我怎样才能做到这一点

mto*_*oto 14

您需要先转换为pandas.DataFrame使用toPandas(),然后您可以使用to_dict()转置数据框上的方法orient='list':

df.toPandas().set_index('name').T.to_dict('list')
# Out[1]: {u'Alice': [10, 80]}
Run Code Online (Sandbox Code Playgroud)

  • 这就是为什么你应该在你的问题中分享预期的输出,为什么年龄为"5"而不是"10"?如果您打算创建字典,则应该有唯一的记录. (3认同)
  • 我不鼓励在这里使用熊猫.熊猫是一个很大的依赖,并不是这么简单的操作所必需的. (2认同)

Fok*_*ong 12

请看下面的例子:

>>> from pyspark.sql.functions import col
>>> df = (sc.textFile('data.txt')
            .map(lambda line: line.split(","))
            .toDF(['name','age','height'])
            .select(col('name'), col('age').cast('int'), col('height').cast('int')))

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|  Bob|  5|    80|
|Alice| 10|    80|
+-----+---+------+

>>> list_persons = map(lambda row: row.asDict(), df.collect())
>>> list_persons
[
    {'age': 5, 'name': u'Alice', 'height': 80}, 
    {'age': 5, 'name': u'Bob', 'height': 80}, 
    {'age': 10, 'name': u'Alice', 'height': 80}
]

>>> dict_persons = {person['name']: person for person in list_persons}
>>> dict_persons
{u'Bob': {'age': 5, 'name': u'Bob', 'height': 80}, u'Alice': {'age': 10, 'name': u'Alice', 'height': 80}}
Run Code Online (Sandbox Code Playgroud)

我用来测试的输入data.txt:

Alice,5,80
Bob,5,80
Alice,10,80
Run Code Online (Sandbox Code Playgroud)

首先,我们通过阅读线条使用pyspark进行加载.然后我们通过在逗号上拆分将行转换为列.然后我们将原生RDD转换为DF并将名称添加到colume中.最后,我们将列转换为适当的格式.

然后我们收集驱动程序的所有内容,并使用一些python列表理解我们将数据转换为首选的表单.我们Row使用该asDict()方法将对象转换为字典.在输出中我们可以观察到Alice只出现一次,但这当然是因为Alice的密钥被覆盖了.

请记住,在将结果返回给驱动程序之前,您希望在pypspark中进行所有处理和过滤.

希望这会有所帮助,欢呼.

  • Fokko,您好,list_persons的打印为我渲染了“ <地图对象在0x7f09000baf28>”。有什么帮助吗? (2认同)
  • 将列表包裹在地图上,即 list_persons = list(map(lambda row: row.asDict(), df.collect())) (2认同)

Ada*_*han 5

RDD 内置了 asDict() 函数,它允许将每一行表示为一个 dict。

如果您有数据帧 df,则需要将其转换为 rdd 并应用 asDict()。

new_rdd = df.rdd.map(lambda row: row.asDict(True))
Run Code Online (Sandbox Code Playgroud)

然后可以使用 new_rdd 执行正常的 python 映射操作,例如:

# You can define normal python functions like below and plug them when needed
def transform(row):
    # Add a new key to each row
    row["new_key"] = "my_new_value"
    return row

new_rdd = new_rdd.map(lambda row: transform(row))
Run Code Online (Sandbox Code Playgroud)