python Spark avro

Rol*_*ndo 18 python avro apache-spark

在尝试编写avro时,我收到以下错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 35.0 failed 1 times, most recent failure: Lost task 7.0 in stage 35.0 (TID 110, localhost): java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.avro.mapred.AvroWrapper
Run Code Online (Sandbox Code Playgroud)

我使用以下3条记录读取了avro文件:

avro_rdd = sc.newAPIHadoopFile(
    "threerecords.avro",
    "org.apache.avro.mapreduce.AvroKeyInputFormat",
    "org.apache.avro.mapred.AvroKey",
    "org.apache.hadoop.io.NullWritable",
    keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
    conf=None)

output = avro_rdd.map(lambda x: x[0]).collect()
Run Code Online (Sandbox Code Playgroud)

然后我尝试写出一条记录(avro中保存的输出):

conf = {"avro.schema.input.key": reduce(lambda x, y: x + y, sc.textFile("myschema.avsc", 1).collect())}

sc.parallelize([output[0]]).map(lambda x: (x, None)).saveAsNewAPIHadoopFile(
    "output.avro",
    "org.apache.avro.mapreduce.AvroKeyOutputFormat",
    "org.apache.avro.mapred.AvroKey",
    "org.apache.hadoop.io.NullWritable",
    keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
    conf=conf)
Run Code Online (Sandbox Code Playgroud)

如何解决这个错误/写出一个单独的avro记录?我知道我的架构是正确的,因为它来自avro本身.

rfk*_*aas 4

目前似乎不支持此功能。您现在尝试使用 java 映射作为 Avro 记录并再次将其转换为 Java 映射。这就是为什么您会收到有关 java hashmap 的错误。

staslos 提出了一个添加 Avro 输出格式的拉取请求,请参阅拉取请求和示例的链接。

AvroConverters.scala 中缺少一个转换器,用于将 java 映射转换回 avro 格式。