AvroRuntimeException:不是联合:{“type”:“long”,“逻辑类型”:“timestamp-millis”}

Cas*_*sie 5 hdfs avro apache-spark

我正在尝试使用存储在架构注册表中的 Avro 架构将数据从 Spark 数据帧保存到 HDFS。但是,我在写入数据时遇到错误:

Caused by: org.apache.avro.AvroRuntimeException: Not a union: {"type":"long","logicalType":"timestamp-millis"}
    at org.apache.avro.Schema.getTypes(Schema.java:299)
    at org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
    at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:209)
    at org.apache.spark.sql.avro.AvroSerializer$$anonfun$3.apply(AvroSerializer.scala:208)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:296)
    at org.apache.spark.sql.avro.AvroSerializer.newStructConverter(AvroSerializer.scala:208)
    at org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:51)
    at org.apache.spark.sql.avro.AvroOutputWriter.serializer$lzycompute(AvroOutputWriter.scala:42)
    at org.apache.spark.sql.avro.AvroOutputWriter.serializer(AvroOutputWriter.scala:42)
    at org.apache.spark.sql.avro.AvroOutputWriter.write(AvroOutputWriter.scala:64)
    at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:137)
Run Code Online (Sandbox Code Playgroud)

可能是什么原因?

Avro 架构中的字段如下所示:

{"name":"CreateDate","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}
Run Code Online (Sandbox Code Playgroud)

以下是日期格式的示例:

1900-01-01 00:00:00
Run Code Online (Sandbox Code Playgroud)

Spark dataframe中该字段的数据类型:

|-- CreateDate: timestamp (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这是我写入数据的方式:

dataDF.write
  .mode("append")
  .format("avro")
  .option(
    "avroSchema",
    SchemaRegistry.getSchema(
      schemaRegistryConfig.url,
      schemaRegistryConfig.dataSchemaSubject,
      schemaRegistryConfig.dataSchemaVersion))
  .save(hdfsURL)
Run Code Online (Sandbox Code Playgroud)