读取镶木地板文件时,无法解析使用int和double的合并架构

Joh*_*agg 8 scala apache-spark apache-spark-sql

我有两个镶木地板文件,一个包含整数字段myField,另一个包含双字段myField.尝试同时读取这两个文件时

val basePath = "/path/to/file/"
val fileWithInt = basePath + "intFile.snappy.parquet"
val fileWithDouble = basePath + "doubleFile.snappy.parquet"
val result = spark.sqlContext.read.option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")
Run Code Online (Sandbox Code Playgroud)

我收到以下错误

Caused by: org.apache.spark.SparkException: Failed to merge fields 'myField' and 'myField'. Failed to merge incompatible data types IntegerType and DoubleType
Run Code Online (Sandbox Code Playgroud)

传递显式模式时

val schema = StructType(Seq(new StructField("myField", IntegerType)))
val result = spark.sqlContext.read.schema(schema).option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")
Run Code Online (Sandbox Code Playgroud)

它失败了以下内容

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
Run Code Online (Sandbox Code Playgroud)

当铸造一个双

val schema = StructType(Seq(new StructField("myField", DoubleType)))
Run Code Online (Sandbox Code Playgroud)

我明白了

java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
    at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:60)
Run Code Online (Sandbox Code Playgroud)

除了重新处理源数据之外,有没有人知道解决这个问题的方法.

Mik*_*nte 6

根据您要读取的文件数量,您可以使用以下两种方法之一:

这最适合数量较少的镶木地板文件

def merge(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType))
    }.reduce(_.union(_))
  }
Run Code Online (Sandbox Code Playgroud)

这种方法可以更好地处理大量文件,因为它可以保持沿袭较短

def merge2(spark: SparkSession, paths: Seq[String]): DataFrame = {
    import spark.implicits._

    spark.sparkContext.union(paths.par.map {
      path =>
        spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType)).as[Double].rdd
    }.toList).toDF
  }
Run Code Online (Sandbox Code Playgroud)