Spark 中区分大小写的拼花模式合并

Grb*_*nho 4 apache-spark parquet

我正在尝试使用 Spark 加载和分析一些镶木地板文件。我schemaMerge用来加载文件,因为较新的文件有一些额外的列。还有一些文件的列名是小写的,而其他文件的列名是大写的。

例如

file1.parquet 有一个像

column1 integer,
column2 integer
Run Code Online (Sandbox Code Playgroud)

file2.parquet有类似的东西:

Column1 integer,
Column2 integer,
Column3 integer
Run Code Online (Sandbox Code Playgroud)

我遇到了类inferSchema方法的问题ParquetFileFormat。模式合并委托给StructType mergespark sql 的方法。据我所知,该方法只能以区分大小写的方式工作。在内部,它使用地图按名称查找字段,如果案例不匹配,它会将其解释为新字段。稍后,当检查模式是否存在重复项时,会考虑区分大小写的配置,我们最终会得到重复的列。这导致

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema
Run Code Online (Sandbox Code Playgroud)

有没有办法让模式合并不区分大小写?

我期待得到这样的结果作为架构:

column1 integer,
column2 integer,
Column3 integer
Run Code Online (Sandbox Code Playgroud)

rlu*_*uta 5

您可以spark.sql.caseSensitive=true在配置中进行设置,使 Spark SQL 模式区分大小写。它还影响模式合并。

scala> spark.conf.set("spark.sql.caseSensitive","true")

scala> val df = sc.parallelize(1 to 1000).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.withColumnRenamed("value","VALUE").write.parquet("test_uc")

scala> df.write.parquet("test_lc")

scala> val df2=spark.read.option("mergeSchema","true").parquet("test_*")
df2: org.apache.spark.sql.DataFrame = [value: int, VALUE: int]

scala> val merged = df2.columns.groupBy(_.toLowerCase)
                   .map(t => coalesce(t._2.map(col):_*).as(t._1))
                   .toArray
merged: Array[org.apache.spark.sql.Column] = Array(coalesce(value, VALUE) AS `value`)

scala> df2.select(merged:_*)
res2: org.apache.spark.sql.DataFrame = [value: int]

scala> spark.conf.set("spark.sql.caseSensitive","false")

// process your dataframe
Run Code Online (Sandbox Code Playgroud)