在 Spark Dataframe 中跨多行 json 字符串统一架构

Ada*_*ted 5 python pyspark

我有一个关于包含一系列 json 字符串的 PySpark DataFrame 中的行的难题。

问题围绕着每一行可能包含与另一行不同的模式,因此当我想在 PySpark 中将所述行转换为可下标的数据类型时,我需要有一个“统一”模式。

例如,考虑这个数据框

import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))
Run Code Online (Sandbox Code Playgroud)

请注意,每一行都包含不同版本的 json 字符串。为了解决这个问题,我做了以下转换

import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame


def convert_to_row(d: dict) -> Row:
    """Convert a dictionary to a SparkRow.

    Parameters
    ----------
    d : dict
        Dictionary to convert.

    Returns
    -------
    Row

    """
    return Row(**OrderedDict(sorted(d.items())))


def get_schema_from_dictionary(the_dict: dict):
    """Create a schema from a dictionary.

    Parameters
    ----------
    the_dict : dict

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema


def get_universal_schema(df: SparkDataFrame, column: str):
    """Given a dataframe, retrieve the "global" schema for the column.

    NOTE: It does this by merging across all the rows, so this will
          take a long time for larger dataframes.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
    mega_dict = {}
    for value in col_values:
        mega_dict = {**mega_dict, **value}

    return get_schema_from_dictionary(mega_dict)


def get_sample_schema(df, column):
    """Given a dataframe, sample a single value to convert.

    NOTE: This assumes that the dataframe has the same schema
          over all rows.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return get_universal_schema(df.limit(1), column)


def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
    """Convert json-string column to a subscriptable object.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.
    manual_schema : PysparkSchema, optional
        Schema understood by PySpark, by default None
    merge : bool, optional
        Parse the whole dataframe to extract a global schema, by default False

    Returns
    -------
    SparkDataFrame

    """
    if manual_schema is None or manual_schema == {}:
        if merge:
            schema = get_universal_schema(df, column)
        else:
            schema = get_sample_schema(df, column)
    else:
        schema = manual_schema

    return df.withColumn(column, fcn.from_json(column, schema))
Run Code Online (Sandbox Code Playgroud)

然后,我可以简单地执行以下操作,以获取具有统一架构的新数据框

df = from_json(df, column='B', merge=True)
df.printSchema()
root
 |-- A: long (nullable = true)
 |-- B: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: long (nullable = true)
 |    |-- d: double (nullable = true)
 |    |-- f: struct (nullable = true)
 |    |    |-- maybe_this: long (nullable = true)
 |    |    |-- some_other: struct (nullable = true)
 |    |    |    |-- A: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)

现在我们来到了问题的关键。由于我在这里执行此操作,因此col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]我仅限于主节点上的内存量。

在我收集到主节点之前,如何执行类似的程序,将工作更多地分配给每个工作人员?

jxc*_*jxc 3

如果我正确理解你的问题,因为我们可以使用RDD作为spark.read.json()方法path的参数,并且RDD是分布式的,可以减少在大型数据集上使用方法潜在的OOM问题,因此你可以尝试调整函数至以下内容:collect()get_universal_schema

def get_universal_schema(df: SparkDataFrame, column: str):
    return spark.read.json(df.select(column).rdd.map(lambda x: x[0])).schema
Run Code Online (Sandbox Code Playgroud)

并保持两个功能:get_sample_schema()from_json()原样。