V. *_*mma 19 emr apache-spark parquet apache-spark-sql spark-dataframe
我遇到了一个问题,我将Parquet数据作为S3中的每日块(以形式s3://bucketName/prefix/YYYY/MM/DD/
)但我无法从不同的日期读取AWS EMR Spark中的数据,因为某些列类型不匹配,我得到许多异常中的一个,例如:
java.lang.ClassCastException: optional binary element (UTF8) is not a group
Run Code Online (Sandbox Code Playgroud)
在某些文件中出现的数组类型具有值,但同一列可能null
在其他文件中具有值,然后将其推断为String类型.
要么
org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)
Run Code Online (Sandbox Code Playgroud)
我在S3中以JSON格式存在原始数据,我最初的计划是创建一个自动作业,启动一个EMR集群,读取前一个日期的JSON数据,然后将其作为镶木地板写回S3.
JSON数据也分为日期,即键具有日期前缀.阅读JSON工作正常.无论当前正在读取多少数据,都可以从数据中推断出模式.
但是当编写镶木地板文件时问题就会出现.据我所知,当我使用元数据文件编写镶木地板时,这些文件包含镶木地板文件的所有零件/分区的模式.对我而言,似乎也可以使用不同的模式.当我禁用写入元数据时,据说Spark从给定Parquet路径中的第一个文件推断整个模式,并假设它通过其他文件保持不变.
当一些应该是double
类型的列只有给定日期的整数值时,从JSON读取它们(它们将这些数字作为整数,没有浮点数)使得Spark认为它是一个具有类型的列long
.即使我可以在编写Parquet文件之前将这些列转换为double,但这仍然不好,因为架构可能会更改,可以添加新列,并且无法跟踪此列.
我看到有些人有同样的问题,但我还没有找到一个足够好的解决方案.
有什么最佳实践或解决方案?
这些是我用于将镶木地板写入S3的选项; 关闭模式合并提高写回性能 - 也可以解决您的问题
val PARQUET_OPTIONS = Map(
"spark.sql.parquet.mergeSchema" -> "false",
"spark.sql.parquet.filterPushdown" -> "true")
Run Code Online (Sandbox Code Playgroud)
当我从JSON的每日块中读取数据并在每日S3文件夹中写入Parquet时,在写入Jquet之前没有指定我自己的模式或在写入Parquet之前将容易出错的列转换为正确的类型,Spark可能会推断出不同日期的不同模式数据取决于数据实例中的值并写入具有冲突模式的Parquet文件.
它可能不是一个完美的解决方案,但我发现通过不断发展的架构解决我的问题的唯一方法如下:
在我每天(更具体地说是夜间)批量处理前一天数据的cron工作之前,我正在创建一个大多数为空值的虚拟对象.
我确保ID是可识别的,例如,当真实数据具有唯一ID-s时,我将"dummy"字符串作为ID添加到虚拟数据对象.
然后我将给出具有容易出错类型的属性的期望值,例如我将给出浮点数/双精度非零值,因此当编组到JSON时,它们肯定会有小数分隔符,例如"0.2"而不是"0"(当编组到JSON时,带有0值的双精度/浮点数显示为"0"而不是"0.0").
字符串和布尔值和整数工作正常,但除了双精度/浮点数之外,我还需要将数组实例化为空数组和其他类/结构的对象以及相应的空对象,因此它们不会是"null"-s,因为Spark读取null-s in as strings.
然后,如果我填充了所有necessery字段,我将把对象编组为JSON并将文件写入S3.
然后我会在我的Scala批处理脚本中使用这些文件来读取它们,将模式保存到变量中,并在读取真实的JSON数据时将此模式作为参数提供,以避免Spark执行自己的模式推断.
这样我知道所有字段都是相同的类型,并且只有在添加新字段时才需要加入模式.
当然,当添加易出错类型的新字段时,它会增加手动更新虚拟对象创建的缺点,但这是一个小缺点,因为它是我发现的唯一有效的解决方案.
归档时间: |
|
查看次数: |
13651 次 |
最近记录: |