Joh*_*agg 8 scala apache-spark apache-spark-sql
我有两个镶木地板文件,一个包含整数字段myField,另一个包含双字段myField.尝试同时读取这两个文件时
val basePath = "/path/to/file/"
val fileWithInt = basePath + "intFile.snappy.parquet"
val fileWithDouble = basePath + "doubleFile.snappy.parquet"
val result = spark.sqlContext.read.option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")
Run Code Online (Sandbox Code Playgroud)
我收到以下错误
Caused by: org.apache.spark.SparkException: Failed to merge fields 'myField' and 'myField'. Failed to merge incompatible data types IntegerType and DoubleType
Run Code Online (Sandbox Code Playgroud)
传递显式模式时
val schema = StructType(Seq(new StructField("myField", IntegerType)))
val result = spark.sqlContext.read.schema(schema).option("mergeSchema", true).option("basePath", basePath).parquet(Seq(fileWithInt, fileWithDouble): _*).select("myField")
Run Code Online (Sandbox Code Playgroud)
它失败了以下内容
java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
Run Code Online (Sandbox Code Playgroud)
当铸造一个双
val schema = StructType(Seq(new StructField("myField", DoubleType)))
Run Code Online (Sandbox Code Playgroud)
我明白了
java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
at org.apache.parquet.column.Dictionary.decodeToDouble(Dictionary.java:60)
Run Code Online (Sandbox Code Playgroud)
除了重新处理源数据之外,有没有人知道解决这个问题的方法.
根据您要读取的文件数量,您可以使用以下两种方法之一:
这最适合数量较少的镶木地板文件
def merge(spark: SparkSession, paths: Seq[String]): DataFrame = {
import spark.implicits._
paths.par.map {
path =>
spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType))
}.reduce(_.union(_))
}
Run Code Online (Sandbox Code Playgroud)
这种方法可以更好地处理大量文件,因为它可以保持沿袭较短
def merge2(spark: SparkSession, paths: Seq[String]): DataFrame = {
import spark.implicits._
spark.sparkContext.union(paths.par.map {
path =>
spark.read.parquet(path).withColumn("myField", $"myField".cast(DoubleType)).as[Double].rdd
}.toList).toDF
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1180 次 |
| 最近记录: |