如何在 pyspark 中按字母顺序对嵌套结构的列进行排序?

Tul*_*ula 6 python struct apache-spark pyspark

我有以下架构的数据。我希望所有列都应按字母顺序排序。我想要它在 pyspark 数据框中。

root
 |-- _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

下面的代码仅对外部列进行排序,而不对嵌套列进行排序。

>>> cols = df.columns
>>> df2=df[sorted(cols)]
>>> df2.printSchema()
Run Code Online (Sandbox Code Playgroud)

此代码后的架构如下所示

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

(因为id处有下划线,所以它首先出现)

我想要的架构如下。(即使是地址内的列也应该排序)

root
 |-- _id: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- pin: integer (nullable = true)
 |    |-- street: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

提前致谢。

pau*_*ult 3

这是一个适用于任意深度嵌套的解决方案StructType,它不依赖于硬编码任何列名。

为了进行演示,我创建了以下稍微复杂的架构,其中列内有第二层嵌套address。假设您的 DataFrameschema如下:

df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
# |-- address: struct (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- city: string (nullable = true)
# |    |-- zip: struct (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |    |    |-- first5: integer (nullable = true)
# |    |-- street: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

请注意该address.zip字段包含 2 个无序子字段。

您可以定义一个函数,该函数将递归地单步执行schema字段并对字段进行排序以构建 Spark-SQL 选择表达式:

from pyspark.sql.types import StructType, StructField

def schemaToSelectExpr(schema, baseField=""):
    select_cols = []
    for structField in sorted(schema, key=lambda x: x.name):
        if structField.dataType.typeName() == 'struct':

            subFields = []
            for fld in sorted(structField.jsonValue()['type']['fields'], 
                              key=lambda x: x['name']):
                newStruct = StructType([StructField.fromJson(fld)])
                newBaseField = structField.name
                if baseField:
                    newBaseField = baseField + "." + newBaseField
                subFields.extend(schemaToSelectExpr(newStruct, baseField=newBaseField))

            select_cols.append(
                "struct(" + ",".join(subFields) + ") AS {}".format(structField.name)
            )
        else:
            if baseField:
                select_cols.append(baseField + "." + structField.name)
            else:
                select_cols.append(structField.name)
    return select_cols
Run Code Online (Sandbox Code Playgroud)

在此 DataFrame 的架构上运行此命令会产生以下结果(为了便于阅读,我已将长“地址”字符串分成两行):

print(schemaToSelectExpr(df.schema))
#['_id',
#'struct(address.city,address.pin,address.street,
#        struct(address.zip.first5,address.zip.last4) AS zip) AS address',
# 'first_name',
# 'last_name']
Run Code Online (Sandbox Code Playgroud)

现在使用selectExpr对列进行排序:

df = df.selectExpr(schemaToSelectExpr(df.schema))
df.printSchema()
#root
# |-- _id: string (nullable = true)
# |-- address: struct (nullable = false)
# |    |-- city: string (nullable = true)
# |    |-- pin: integer (nullable = true)
# |    |-- street: string (nullable = true)
# |    |-- zip: struct (nullable = false)
# |    |    |-- first5: integer (nullable = true)
# |    |    |-- last4: integer (nullable = true)
# |-- first_name: string (nullable = true)
# |-- last_name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)