镶木地板上的 Spark MergeSchema

Aru*_*n S 2 scala azure apache-spark databricks

对于模式演化 Mergeschema 可以在 Spark 中用于 Parquet 文件格式,我对此有以下说明

这是否仅支持 Parquet 文件格式或任何其他文件格式,如 csv、txt 文件。

如果在中间添加新的附加列,我知道 Mergeschema 会将列移到最后。

如果列顺序受到干扰,那么 Mergeschema 是否会在创建时将列对齐到正确的顺序,还是我们需要通过选择所有列来手动执行此操作。

从评论更新: 例如,如果我有一个如下的架构并创建如下表 -spark.sql("CREATE TABLE emp USING DELTA LOCATION '****'") empid,empname,salary====> 001,ABC,10000如果我得到以下格式,第二天empid,empage,empdept,empname,salary====> 001,30,XYZ,ABC,10000

是否有新列 - 之后empage, empdept会添加empid,empname,salary columns

Ram*_*ram 6

Q : 1.这是否仅支持 Parquet 文件格式或任何其他文件格式,如 csv、txt 文件。 2.如果列顺序受到干扰,那么Mergeschema是否会在创建时将列对齐到正确的顺序,还是我们需要通过选择所有列来手动执行此操作


AFAIK 合并模式仅由 parquet 支持,而不由其他格式(如 csv 、txt)支持。

Mergeschema ( spark.sql.parquet.mergeSchema) 会以正确的顺序对齐列,即使它们是分布的。

来自关于parquet schema-merging 的spark 文档的示例 :

import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
Run Code Online (Sandbox Code Playgroud)

更新:您在评论框中给出的真实示例...


问:是否empage, empdept会在新栏目之后添加 empid,empname,salary columns


回答:是的 EMPAGE、EMPDEPT 是在 EMPID、EMPNAME、SALARY 之后添加的,然后是您的日期列。

查看完整示例。

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SaveMode


object CSVDataSourceParquetSchemaMerge extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)

  import org.apache.spark.sql.SparkSession

  val spark = SparkSession.builder().appName("CSVParquetSchemaMerge")
    .master("local")
    .getOrCreate()


  import spark.implicits._

  val csvDataday1 = spark.sparkContext.parallelize(
    """
      |empid,empname,salary
      |001,ABC,10000
    """.stripMargin.lines.toList).toDS()
  val csvDataday2 = spark.sparkContext.parallelize(
    """
      |empid,empage,empdept,empname,salary
      |001,30,XYZ,ABC,10000
    """.stripMargin.lines.toList).toDS()

  val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday1)

  println("first day data ")
  frame.show
  frame.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=1")
  frame.printSchema

  val frame1 = spark.read.option("header", true).option("inferSchema", true).csv(csvDataday2)
  frame1.write.mode(SaveMode.Overwrite).parquet("data/test_table/day=2")
  println("Second day data ")

  frame1.show(false)
  frame1.printSchema

  // Read the partitioned table
  val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
  println("Merged Schema")
  mergedDF.printSchema
  println("Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column")
  mergedDF.show(false)


}


Run Code Online (Sandbox Code Playgroud)

结果 :

first day data 
+-----+-------+------+
|empid|empname|salary|
+-----+-------+------+
|    1|    ABC| 10000|
+-----+-------+------+

root
 |-- empid: integer (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)

Second day data 
+-----+------+-------+-------+------+
|empid|empage|empdept|empname|salary|
+-----+------+-------+-------+------+
|1    |30    |XYZ    |ABC    |10000 |
+-----+------+-------+-------+------+

root
 |-- empid: integer (nullable = true)
 |-- empage: integer (nullable = true)
 |-- empdept: string (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)

Merged Schema
root
 |-- empid: integer (nullable = true)
 |-- empname: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- empage: integer (nullable = true)
 |-- empdept: string (nullable = true)
 |-- day: integer (nullable = true)

Merged Datarame where EMPAGE,EMPDEPT WERE ADDED AFER EMPID,EMPNAME,SALARY followed by your day column
+-----+-------+------+------+-------+---+
|empid|empname|salary|empage|empdept|day|
+-----+-------+------+------+-------+---+
|1    |ABC    |10000 |30    |XYZ    |2  |
|1    |ABC    |10000 |null  |null   |1  |
+-----+-------+------+------+-------+---+

Run Code Online (Sandbox Code Playgroud)

目录树:

在此处输入图片说明