使用spark-avro在记录中跳过字段

ita*_*ysk 5 avro apache-spark spark-avro

更新:spark-avro软件包已更新以支持此方案.https://github.com/databricks/spark-avro/releases/tag/v3.1.0

我有一个AVRO文件,由我控制之外的第三方创建,我需要使用spark处理.AVRO架构是一个记录,其中一个字段是混合联合类型:

{    
    "name" : "Properties",                              
    "type" : {                                          
    "type" : "map",                                   
    "values" : [ "long", "double", "string", "bytes" ]
}                                                   
Run Code Online (Sandbox Code Playgroud)

spark-avro阅读器不支持此功能:

除了上面列出的类型之外,它还支持读取三种类型的联合类型:union(int,long)union(float,double)union(something,null),其中某些东西是上面列出的受支持的Avro类型之一,或者是支持的联合类型之一.

阅读AVRO的模式演变和解决方案,我希望能够通过指定省略此字段的不同读取器模式来跳过有问题的字段时读取文件.根据AVRO Schema Resolution文档,它应该工作:

如果作者的记录包含读者记录中不存在名称的字段,则忽略该作者对该字段的值.

所以我修改了使用

 val df = sqlContext.read.option("avroSchema", avroSchema).avro(path)
Run Code Online (Sandbox Code Playgroud)

编写avroSchema器使用的完全相同的架构在哪里,但没有有问题的字段.

但是我仍然得到关于混合联合类型的相同错误.

AVRO支持这种架构演变的场景吗?与avro-spark?还有另一种方法来实现我的目标吗?


更新:我已经使用Apache Avro 1.8.1测试了相同的场景(实际上是相同的文件)并且它按预期工作.然后它必须具体spark-avro.有任何想法吗?

ita*_*ysk 5

更新:spark-avro软件包已更新以支持此方案.https://github.com/databricks/spark-avro/releases/tag/v3.1.0

这实际上并没有回答我的问题,而是针对同一问题的不同解决方案.

由于目前spark-avro没有此功能(请参阅我对该问题的评论) - 我使用了avro的org.apache.avro.mapreduce和spark的newAPIHadoopFile.这是一个简单的例子:

val path = "..."
val conf = new SparkConf().setAppName("avro test")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
val sc = new SparkContext(conf)

val avroRdd = sc.newAPIHadoopFile(path,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable])
Run Code Online (Sandbox Code Playgroud)

与spark-avro相反,官方avro libs支持混合联合类型和模式演变.