将数据帧转换为JSON(在pyspark中),然后选择所需的字段

xn1*_*139 8 python json apache-spark pyspark

我是Spark的新手.我有一个包含某些分析结果的数据框.我将该数据帧转换为JSON,因此我可以在Flask App中显示它:

results = result.toJSON().collect()
Run Code Online (Sandbox Code Playgroud)

我的json文件中的示例条目如下.然后我尝试运行for循环以获得特定结果:

{"userId":"1","systemId":"30","title":"interest"}

for i in results:
    print i["userId"]
Run Code Online (Sandbox Code Playgroud)

这根本不起作用,我得到的错误如:Python(json):TypeError:期望的字符串或缓冲区

我用json.dumpsjson.loads,仍然一无所获-我不断收到错误如字符串索引必须是整数,以及上述错误.

然后我尝试了这个:

  print i[0]
Run Code Online (Sandbox Code Playgroud)

这给了我json"{"而不是第一行的第一个字符.我真的不知道该怎么办,谁能告诉我哪里出错了?

非常感谢.

Bal*_*ala 12

import json
>>> df = sqlContext.read.table("n1")
>>> df.show()
+-----+-------+----+---------------+-------+----+
|   c1|     c2|  c3|             c4|     c5|  c6|
+-----+-------+----+---------------+-------+----+
|00001|Content|   1|Content-article|       |2018|
|00002|Content|null|Content-article|Content|2015|
+-----+-------+----+---------------+-------+----+

>>> results = df.toJSON().map(lambda j: json.loads(j)).collect()
>>> for i in results: print i["c1"], i["c6"]
... 
00001 2018
00002 2015
Run Code Online (Sandbox Code Playgroud)

  • 这非常有帮助。感谢您提供一个易于遵循的简单解决方案。 (2认同)

Zei*_*ist 9

这是对我有用的:

df_json = df.toJSON()

for row in df_json.collect():
    #json string
    print(row) 

    #json object
    line = json.loads(row) 
    print(line[some_key]) 
Run Code Online (Sandbox Code Playgroud)

请记住,使用 .collect() 是不可取的,因为它收集分布式数据帧,并且违背了使用数据帧的目的。


All*_*ter 8

如果结果result.toJSON().collect()是JSON编码的字符串,那么您将使用json.loads()它将其转换为dict.您遇到的问题是,当您使用循环迭代a dictfor,您将获得该键的键dict.在你的for循环中,你将关键视为a dict,而实际上它只是一个string.试试这个:

# toJSON() turns each row of the DataFrame into a JSON string
# calling first() on the result will fetch the first row.
results = json.loads(result.toJSON().first())

for key in results:
    print results[key]

# To decode the entire DataFrame iterate over the result
# of toJSON()

def print_rows(row):
    data = json.loads(row)
    for key in data:
        print "{key}:{value}".format(key=key, value=data[key])


results = result.toJSON()
results.foreach(print_rows)    
Run Code Online (Sandbox Code Playgroud)

编辑:问题是collect返回a list,而不是a dict.我已经更新了代码.始终阅读文档.

collect()返回包含此RDD中所有元素的列表.

注意仅当期望结果数组很小时才应使用此方法,因为所有数据都已加载到驱动程序的内存中.

EDIT2:我不能强调,总是阅读文档.

编辑3:这里.


MrE*_*MrE 6

要获取 python 字典数组:

results = df.toJSON().map(json.loads).collect()
Run Code Online (Sandbox Code Playgroud)

要获取 JSON 字符串数组:

results = df.toJSON().collect()
Run Code Online (Sandbox Code Playgroud)

获取 JSON 字符串(即数组的 JSON 字符串):

results = df.toPandas().to_json(orient='records')
Run Code Online (Sandbox Code Playgroud)

并使用它来获取 Python 字典数组:

results = json.loads(df.toPandas().to_json(orient='records'))
Run Code Online (Sandbox Code Playgroud)