我有一个关于包含一系列 json 字符串的 PySpark DataFrame 中的行的难题。
问题围绕着每一行可能包含与另一行不同的模式,因此当我想在 PySpark 中将所述行转换为可下标的数据类型时,我需要有一个“统一”模式。
例如,考虑这个数据框
import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))
Run Code Online (Sandbox Code Playgroud)
请注意,每一行都包含不同版本的 json 字符串。为了解决这个问题,我做了以下转换
import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame
def convert_to_row(d: dict) -> Row:
"""Convert a dictionary to a SparkRow.
Parameters
----------
d : dict
Dictionary to convert.
Returns
-------
Row
"""
return Row(**OrderedDict(sorted(d.items())))
def get_schema_from_dictionary(the_dict: dict):
"""Create a schema from a dictionary.
Parameters
----------
the_dict : dict
Returns
-------
schema
Schema understood by PySpark.
"""
return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema
def get_universal_schema(df: SparkDataFrame, column: str):
"""Given a dataframe, retrieve the "global" schema for the column.
NOTE: It does this by merging across all the rows, so this will
take a long time for larger dataframes.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
mega_dict = {}
for value in col_values:
mega_dict = {**mega_dict, **value}
return get_schema_from_dictionary(mega_dict)
def get_sample_schema(df, column):
"""Given a dataframe, sample a single value to convert.
NOTE: This assumes that the dataframe has the same schema
over all rows.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
return get_universal_schema(df.limit(1), column)
def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
"""Convert json-string column to a subscriptable object.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
manual_schema : PysparkSchema, optional
Schema understood by PySpark, by default None
merge : bool, optional
Parse the whole dataframe to extract a global schema, by default False
Returns
-------
SparkDataFrame
"""
if manual_schema is None or manual_schema == {}:
if merge:
schema = get_universal_schema(df, column)
else:
schema = get_sample_schema(df, column)
else:
schema = manual_schema
return df.withColumn(column, fcn.from_json(column, schema))
Run Code Online (Sandbox Code Playgroud)
然后,我可以简单地执行以下操作,以获取具有统一架构的新数据框
df = from_json(df, column='B', merge=True)
df.printSchema()
root
|-- A: long (nullable = true)
|-- B: struct (nullable = true)
| |-- a: long (nullable = true)
| |-- b: string (nullable = true)
| |-- c: long (nullable = true)
| |-- d: double (nullable = true)
| |-- f: struct (nullable = true)
| | |-- maybe_this: long (nullable = true)
| | |-- some_other: struct (nullable = true)
| | | |-- A: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)
现在我们来到了问题的关键。由于我在这里执行此操作,因此col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]我仅限于主节点上的内存量。
在我收集到主节点之前,如何执行类似的程序,将工作更多地分配给每个工作人员?
如果我正确理解你的问题,因为我们可以使用RDD作为spark.read.json()方法path的参数,并且RDD是分布式的,可以减少在大型数据集上使用方法潜在的OOM问题,因此你可以尝试调整函数至以下内容:collect()get_universal_schema
def get_universal_schema(df: SparkDataFrame, column: str):
return spark.read.json(df.select(column).rdd.map(lambda x: x[0])).schema
Run Code Online (Sandbox Code Playgroud)
并保持两个功能:get_sample_schema()和from_json()原样。
| 归档时间: |
|
| 查看次数: |
512 次 |
| 最近记录: |