lsm*_*mor 6 python apache-spark pyspark
我在 Spark 中有一个类似 json 的结构,如下所示:
>>> df = spark.read.parquet(good_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
Run Code Online (Sandbox Code Playgroud)
根据分区的不同,some-array所有的可能都是一个空数组id。当这种情况发生时,spark 会推断出以下模式:
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string
Run Code Online (Sandbox Code Playgroud)
当然,如果我想读取多个分区,这是一个问题,因为 Spark 无法合并模式。我尝试手动定义架构,所以应该没有问题
>>> df = spark.read.schema(good_schema).parquet(bad_partition_path)
id: string
some-array: array
element: struct
array-field-1: string
array-field-2: string
Run Code Online (Sandbox Code Playgroud)
到目前为止一切顺利,但是当我尝试实际收集数据时出现错误:
>>> df.head(5)
# Long error message
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
Run Code Online (Sandbox Code Playgroud)
我不明白为什么会失败。不应存在架构导致的不兼容问题。如果您想知道,在不指定模式的情况下收集数据是可行的。
>>> df = spark.read.parquet(bad_partition_path)
id: string
some-array: array
element: string # infers wrong schema
>>> df.head(5)
[Row(...)] # actually works
Run Code Online (Sandbox Code Playgroud)
这里有一个 python 中的可重现的例子
from pyspark.sql.types import *
myschema = StructType([
StructField('id', StringType())
, StructField( 'some-array'
, ArrayType(StructType([
StructField('array-field-1', StringType())
, StructField('array-field-2', StringType())
])
))
])
path_writeKO = "path/to/parquet"
jsonKO = '{"id": "OK", "some-array": []}'
dfKO = sc.parallelize([jsonKO])
dfKO = spark.read.json(dfKO)
dfKO.write.parquet(path_writeKO) # write without schema
read_error = spark.read.schema(myschema).parquet(path_writeKO) # read with schema
read_error.collect() # Fails!!
Run Code Online (Sandbox Code Playgroud)
我找到的解决方案是将选项设置dropFieldIfAllNull为True读取 json 文件时。这会导致具有空数组的字段消失,从而使合并模式变得更容易。
>>> jsonKO = '{"id": "OK", "some-array": []}'
>>> dfKO = sc.parallelize([jsonKO])
>>> dfKO = spark.read.option('dropFieldIfAllNull', True).json(dfKO)
id:string
Run Code Online (Sandbox Code Playgroud)
现在,不需要的类型推断将不适用,并且在读取同一文件的多个分区时,该选项mergeSchema将能够读取所有文件而不会发生冲突。
| 归档时间: |
|
| 查看次数: |
7558 次 |
| 最近记录: |