如何使用 C# 反序列化 Kafka 中的 Avro 消息

Nir*_*jan 1 c# avro apache-kafka confluent-schema-registry confluent-platform

嗨,我正在使用 Confluence kafka。我有返回通用记录的消费者。我想反序列化它。我找不到任何办法。我可以手动完成每个字段,例如

 object options = ((GenericRecord)response.Message.Value["Product"])["Options"];
Run Code Online (Sandbox Code Playgroud)

我在这里找到了一个

使用 C# 反序列化 Avro 文件 但是如何将架构转换为流?我想知道我们是否可以使用任何解决方案反序列化到我们的 c# 模型中?任何帮助将不胜感激。谢谢。

Gio*_*ous 5

假设您正在使用confluent-dot-net客户端,您可以使用AvroDeserializer

(异步)Avro 解串器。将此反序列化器与使用该工具GenericRecord生成的 , 类型avrogen.exe或以下基元类型之一结合使用:int, long, float, double, boolean, string, byte[]

例子

var consumeTask = Task.Run(() =>{
  using(var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig {
    SchemaRegistryUrl = schemaRegistryUrl
  }))
  using(var consumer = new ConsumerBuilder < string, GenericRecord > (new ConsumerConfig {
    BootstrapServers = bootstrapServers,
    GroupId = groupName
  }).SetKeyDeserializer(new AvroDeserializer < string > (schemaRegistry).AsSyncOverAsync()).SetValueDeserializer(new AvroDeserializer < GenericRecord > (schemaRegistry).AsSyncOverAsync()).SetErrorHandler((_, e) =>Console.WriteLine($ "Error: {e.Reason}")).Build()) {
    consumer.Subscribe(topicName);

    try {
      while (true) {
        try {
          var consumeResult = consumer.Consume(cts.Token);

          Console.WriteLine($ "Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
        }
        catch(ConsumeException e) {
          Console.WriteLine($ "Consume error: {e.Error.Reason}");
        }
      }
    }
    catch(OperationCanceledException) {
      // commit final offsets and leave the group.
      consumer.Close();
    }
  }
});
Run Code Online (Sandbox Code Playgroud)