反序列化 Avro Spark

m1n*_*keh 5 c# apache-spark azure-stream-analytics spark-avro azure-databricks

我正在使用以下代码将数据流推送到 Azure EventHub Microsoft.Hadoop.Avro.. 此代码每 5 秒运行一次,并简单地插入相同的两个 Avro 序列化项目:

  var strSchema = File.ReadAllText("schema.json");
  var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
  var rootSchema = avroSerializer.WriterSchema as RecordSchema;

  var itemList = new List<AvroRecord>();

  dynamic record_one = new AvroRecord(rootSchema);
  record_one.FirstName = "Some";
  record_one.LastName = "Guy";
  itemList.Add(record_one);

  dynamic record_two = new AvroRecord(rootSchema);
  record_two.FirstName = "A.";
  record_two.LastName = "Person";
  itemList.Add(record_two);

  using (var buffer = new MemoryStream())
  {
      using (var writer = AvroContainer.CreateGenericWriter(strSchema, buffer, Codec.Null))
      {
          using (var streamWriter = new SequentialWriter<object>(writer, itemList.Count))
          {
              foreach (var item in itemList)
              {
                  streamWriter.Write(item);
              }
          }
      }

      eventHubClient.SendAsync(new EventData(buffer.ToArray()));
  }
Run Code Online (Sandbox Code Playgroud)

这里使用的模式同样是 v. simple:

{
  "type": "record",
  "name": "User",
  "namespace": "SerDes",
  "fields": [
    {
      "name": "FirstName",
      "type": "string"
    },
    {
      "name": "LastName",
      "type": "string"
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

我已经验证这一切都很好,在门户上的 Azure 流分析中有一个简单的视图:

流分析截图

到目前为止一切顺利,但我不能在我的一生中正确地反序列化它在 Databricks 中利用from_avro()Scala 下的命令..

将(完全相同的)模式加载为字符串:

val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")
Run Code Online (Sandbox Code Playgroud)

配置事件中心

val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
  .setEventHubName("<NAME_OF_EVENT_HUB>")
  .build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()
Run Code Online (Sandbox Code Playgroud)

读取数据..

// this works, and i can see the serialised data
display(eventhubs.select($"body"))

// this fails, and with an exception: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
display(eventhubs.select(from_avro($"body", sampleJsonSchema)))
Run Code Online (Sandbox Code Playgroud)

所以基本上,这里发生了什么..我正在使用与反序列化相同的模式序列化数据,但是有些东西格式错误..这方面的文档非常稀少(在Microsoft网站上非常少)。

nos*_*ame 2

问题

经过额外调查(主要是在本文的帮助下),我发现我的问题是:from_avro(data: Column, jsonFormatSchema: String)需要 Spark 模式格式而不是 avro 模式格式。文档对此不是很清楚。

解决方案1

Databricks 提供了一种方便的方法from_avro(column: Column, subject: String, schemaRegistryUrl: String)),可以从 kafka 模式注册表中获取所需的 avro 模式并自动转换为正确的格式。

不幸的是,它不适用于纯 Spark,也无法在没有 kafka 模式注册表的情况下使用它。

解决方案2

使用spark提供的schema转换:

// define avro deserializer
class AvroDeserializer() extends AbstractKafkaAvroDeserializer {
  override def deserialize(payload: Array[Byte]): String = {
    val genericRecord = this.deserialize(payload).asInstanceOf[GenericRecord]
    genericRecord.toString
  }
}

// create deserializer instance
val deserializer = new AvroDeserializer()

// register deserializer
spark.udf.register("deserialize_avro", (bytes: Array[Byte]) =>
  deserializer.deserialize(bytes)
)

// get avro schema from registry (but I presume that it should also work with schema read from a local file)
val registryClient = new CachedSchemaRegistryClient(kafkaSchemaRegistryUrl, 128)
val avroSchema = registryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

// consume data 
df.selectExpr("deserialize_avro(value) as data")
  .select(from_json(col("data"), sparkSchema.dataType).as("data"))
  .select("data.*")
Run Code Online (Sandbox Code Playgroud)

  • 直接支持 Schema 注册表的“from_avro”仅适用于 Databricks,我记得...在 Spark 中,它需要 JSON 模式,您可以通过 HTTP 从注册表获取 (2认同)