Whi*_*ler 4 python json apache-spark pyspark
我想知道将换行符分隔的 JSON 文件读入数据帧的最佳实践是什么。关键的是,每个记录中的(必填)字段之一映射到不能保证具有相同子字段的对象(即模式在所有记录中不统一)。
例如,输入文件可能如下所示:
{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}
{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}
{"id": 3, "type": "baz", "data": {"key3": "moo"}}
Run Code Online (Sandbox Code Playgroud)
在这种情况下,字段id、type和data将出现在所有记录中,但映射到的结构data将具有异构模式。
我有两种方法来处理data列的非均匀性:
df = spark.read.options(samplingRatio=1.0).json('s3://bucket/path/to/newline_separated_json.txt')
Run Code Online (Sandbox Code Playgroud)
这种方法的一个明显问题是需要对每条记录进行采样以确定将成为最终模式的字段/模式的超集。鉴于数据集只有数百万条记录,这可能会非常昂贵?或者...
id, type, data。在这里,我不确定继续的最佳方式。例如,我假设只是将data字段声明为字符串,如下所示,将不起作用,因为它没有显式地执行等效于json.dumps?schema = StructType([
StructField("id", StringType(), true),
StructField("type", StringType(), true),
StructField("data", StringType(), true)
])
df = spark.read.json('s3://bucket/path/to/newline_separated_json.txt', schema=schema)
Run Code Online (Sandbox Code Playgroud)
如果我想避免选项 1 产生的扫描完整数据集的成本,摄取此文件并将该data字段保留为 JSON 字符串的最佳方法是什么?
谢谢
我认为您的尝试和总体思路是正确的。这里有另外两种基于内置选项的方法,又名get_json_object/from_json通过数据帧 API,并使用map转换以及 pythonjson.dumps()和json.loads()通过 RDD API。
选项 1: get_json_object() / from_json()
首先让我们尝试一下get_json_object()不需要模式的:
import pyspark.sql.functions as f
df = spark.createDataFrame([
('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())
df.select(f.get_json_object("value", "$.id").alias("id"), \
f.get_json_object("value", "$.type").alias("type"), \
f.get_json_object("value", "$.data").alias("data"))
# +---+----+-----------------------------+
# |id |type|data |
# +---+----+-----------------------------+
# |1 |foo |{"key0":"foo","key2":"meh"} |
# |2 |bar |{"key2":"poo","key3":"pants"}|
# |3 |baz |{"key3":"moo"} |
# +---+----+-----------------------------+
Run Code Online (Sandbox Code Playgroud)
相反from_json()需要一个模式定义:
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as f
df = spark.createDataFrame([
('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())
schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("data", StringType(), True)
])
df.select(f.from_json("value", schema).getItem("id").alias("id"), \
f.from_json("value", schema).getItem("type").alias("type"), \
f.from_json("value", schema).getItem("data").alias("data"))
# +---+----+-----------------------------+
# |id |type|data |
# +---+----+-----------------------------+
# |1 |foo |{"key0":"foo","key2":"meh"} |
# |2 |bar |{"key2":"poo","key3":"pants"}|
# |3 |baz |{"key3":"moo"} |
# +---+----+-----------------------------+
Run Code Online (Sandbox Code Playgroud)
选项 2:映射/RDD API + json.dumps()
from pyspark.sql.types import StringType, StructType, StructField
import json
df = spark.createDataFrame([
'{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}',
'{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}',
'{"id": 3, "type": "baz", "data": {"key3": "moo"}}'
], StringType())
def from_json(data):
row = json.loads(data[0])
return (row['id'], row['type'], json.dumps(row['data']))
json_rdd = df.rdd.map(from_json)
schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("data", StringType(), True)
])
spark.createDataFrame(json_rdd, schema).show(10, False)
# +---+----+--------------------------------+
# |id |type|data |
# +---+----+--------------------------------+
# |1 |foo |{"key2": "meh", "key0": "foo"} |
# |2 |bar |{"key2": "poo", "key3": "pants"}|
# |3 |baz |{"key3": "moo"} |
# +---+----+--------------------------------+
Run Code Online (Sandbox Code Playgroud)
函数from_json会将字符串行转换为(id, type, data). json.loads()将解析 json 字符串并返回一个字典,我们通过它生成并返回最终的元组。
| 归档时间: |
|
| 查看次数: |
2798 次 |
| 最近记录: |