处理pySpark中的空数组(可选的二进制元素(UTF8)不是一个组)

lsm*_*mor 6 python apache-spark pyspark

我在 Spark 中有一个类似 json 的结构,如下所示:

>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string
Run Code Online (Sandbox Code Playgroud)

根据分区的不同,some-array所有的可能都是一个空数组id。当这种情况发生时,spark 会推断出以下模式:

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string
Run Code Online (Sandbox Code Playgroud)

当然,如果我想读取多个分区,这是一个问题,因为 Spark 无法合并模式。我尝试手动定义架构,所以应该没有问题

>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
    element: struct
        array-field-1: string
        array-field-2: string
Run Code Online (Sandbox Code Playgroud)

到目前为止一切顺利,但是当我尝试实际收集数据时出现错误:

>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
Run Code Online (Sandbox Code Playgroud)

我不明白为什么会失败。不应存在​​架构导致的不兼容问题。如果您想知道,在不指定模式的情况下收集数据是可行的。

>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
    element: string   # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works
Run Code Online (Sandbox Code Playgroud)

编辑

这里有一个 python 中的可重现的例子

from pyspark.sql.types import *


myschema = StructType([
   StructField('id', StringType())
 , StructField( 'some-array'
              , ArrayType(StructType([
                  StructField('array-field-1', StringType())
                , StructField('array-field-2', StringType())
                ])
              ))
  ])

path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema

read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!

Run Code Online (Sandbox Code Playgroud)

lsm*_*mor 6

我找到的解决方案是将选项设置dropFieldIfAllNullTrue读取 json 文件时。这会导致具有空数组的字段消失,从而使合并模式变得更容易。

>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string
Run Code Online (Sandbox Code Playgroud)

现在,不需要的类型推断将不适用,并且在读取同一文件的多个分区时,该选项mergeSchema将能够读取所有文件而不会发生冲突。