Pyspark - 循环遍历 structType 和 ArrayType 在 structfield 中进行类型转换

add*_*ted 5 python apache-spark pyspark

我对 pyspark 很陌生,这个问题让我感到困惑。基本上我正在寻找一种可扩展的方法来通过 structType 或 ArrayType 循环类型转换。

我的数据架构示例:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: integer (nullable = true)
 |    |-- rate_2: integer (nullable = true)
 |    |-- rate_3: integer (nullable = true)
 |    |-- card_fee: integer (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- rate_1: integer (nullable = true)
 |    |    |-- rate_2: integer (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)
Run Code Online (Sandbox Code Playgroud)

正如你在这里看到的,card_rates是 struct 并且online_rates是一个 struct 数组。我正在寻找循环遍历上面所有字段并有条件地对它们进行类型转换的方法。理想情况下,如果它应该是数字,它应该转换为双精度,如果它应该是字符串,它应该转换为字符串。我需要循环,因为这些rate_*字段可能会随着时间的推移而增长。

但现在,我对能够循环它们并将它们全部类型转换为字符串感到满意,因为我对 pyspark 非常陌生,并且仍在尝试感受它。

我想要的输出模式:

root
 |-- _id: string (nullable = true)
 |-- created: timestamp (nullable = true)
 |-- card_rates: struct (nullable = true)
 |    |-- rate_1: double (nullable = true)
 |    |-- rate_2: double (nullable = true)
 |    |-- rate_3: double (nullable = true)
 |    |-- card_fee: double (nullable = true)
 |    |-- payment_method: string (nullable = true)
 |-- online_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- rate_1: double (nullable = true)
 |    |    |-- rate_2: double (nullable = true)
 |    |    |-- online_fee: double (nullable = true)
 |-- updated: timestamp (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我已经没有想法如何做到这一点了。

我从这里得到了参考:PySpark将数组内的结构字段转换为字符串

但此解决方案对字段进行硬编码,并且不会真正循环字段。

请帮忙。

abi*_*sis 9

这是借助内置函数的一种解决StructType.simpleString方案_parse_datatype_string

from pyspark.sql.types import *

df_schema = StructType([
  StructField("_id", StringType(), True),
  StructField("created", TimestampType(), True),
  StructField("card_rates", StructType([
                  StructField("rate_1", IntegerType(), True),
                  StructField("rate_2", IntegerType(), True),
                  StructField("rate_3", IntegerType(), True),
                  StructField("card_fee", IntegerType(), True),
                  StructField("card_fee", IntegerType(), True)])),
  StructField("online_rates", ArrayType(
                  StructType(
                    [
                      StructField("rate_1", IntegerType(),True),
                      StructField("rate_2", IntegerType(),True),
                      StructField("online_fee", DoubleType(),True)
                    ]),True),True),
  StructField("updated", TimestampType(), True)])

schema_str = df_schema.simpleString() # this gives -> struct<_id:string,created:timestamp,card_rates:struct<rate_1:int,rate_2:int,rate_3:int,card_fee:int, card_fee:int>,online_rates:array<struct<rate_1:int,rate_2:int,online_fee:double>>,updated:timestamp>

double_schema = schema_str.replace(':int', ':double')

# convert back to StructType
final_schema = _parse_datatype_string(double_schema)
final_schema

Run Code Online (Sandbox Code Playgroud)
  1. 首先将您的架构转换为简单的字符串schema.simpleString
  2. 然后全部替换:int:double
  3. 最后将修改后的字符串模式转换为 StructType_parse_datatype_string

更新:

为了避免反引号问题,@jxc 指出更好的解决方案是对元素进行递归扫描,如下所示:

def transform_schema(schema):

  if schema == None:
    return StructType()

  updated = []
  for f in schema.fields:
    if isinstance(f.dataType, IntegerType):
      # if IntegerType convert to DoubleType
      updated.append(StructField(f.name, DoubleType(), f.nullable))
    elif isinstance(f.dataType, ArrayType):
      # if ArrayType unpack the array type(elementType), do recursion then wrap results with ArrayType 
      updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType))))
    elif isinstance(f.dataType, StructType):
      # if StructType do recursion
      updated.append(StructField(f.name, transform_schema(f.dataType)))
    else:
      # else handle all the other cases i.e TimestampType, StringType etc
      updated.append(StructField(f.name, f.dataType, f.nullable))   

  return StructType(updated)

# call the function with your schema
transform_schema(df_schema)

Run Code Online (Sandbox Code Playgroud)

说明:该函数遍历架构 (StructType) 上的每个项目,并尝试将 int 字段 (StructField) 转换为 double。最后将转换后的schema(StructType)传递给上层(父StructType)。

输出:

StructType(List(
  StructField(_id,StringType,true),
  StructField(created,TimestampType,true),
  StructField(card_rates,
              StructType(List(StructField(rate_1,DoubleType,true),
                              StructField(rate_2,DoubleType,true),
                              StructField(rate_3,DoubleType,true),
                              StructField(card_fee,DoubleType,true),
                              StructField(card_fee,DoubleType,true))),true),
  StructField(online_rates,ArrayType(
    StructType(List(
      StructField(rate_1,DoubleType,true),
      StructField(rate_2,DoubleType,true),
      StructField(online_fee,DoubleType,true))),true),true),
  StructField(updated,TimestampType,true)))
Run Code Online (Sandbox Code Playgroud)

更新:(2020-02-02)

下面是一个关于如何将新模式与现有数据框一起使用的示例:

StructType(List(
  StructField(_id,StringType,true),
  StructField(created,TimestampType,true),
  StructField(card_rates,
              StructType(List(StructField(rate_1,DoubleType,true),
                              StructField(rate_2,DoubleType,true),
                              StructField(rate_3,DoubleType,true),
                              StructField(card_fee,DoubleType,true),
                              StructField(card_fee,DoubleType,true))),true),
  StructField(online_rates,ArrayType(
    StructType(List(
      StructField(rate_1,DoubleType,true),
      StructField(rate_2,DoubleType,true),
      StructField(online_fee,DoubleType,true))),true),true),
  StructField(updated,TimestampType,true)))
Run Code Online (Sandbox Code Playgroud)