Nir*_*jan 1 c# avro apache-kafka confluent-schema-registry confluent-platform
嗨,我正在使用 Confluence kafka。我有返回通用记录的消费者。我想反序列化它。我找不到任何办法。我可以手动完成每个字段,例如
object options = ((GenericRecord)response.Message.Value["Product"])["Options"];
Run Code Online (Sandbox Code Playgroud)
我在这里找到了一个
使用 C# 反序列化 Avro 文件 但是如何将架构转换为流?我想知道我们是否可以使用任何解决方案反序列化到我们的 c# 模型中?任何帮助将不胜感激。谢谢。
假设您正在使用confluent-dot-net客户端,您可以使用AvroDeserializer:
(异步)Avro 解串器。将此反序列化器与使用该工具
GenericRecord生成的 , 类型avrogen.exe或以下基元类型之一结合使用:int,long,float,double,boolean,string,byte[]。
例子:
var consumeTask = Task.Run(() =>{
using(var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig {
SchemaRegistryUrl = schemaRegistryUrl
}))
using(var consumer = new ConsumerBuilder < string, GenericRecord > (new ConsumerConfig {
BootstrapServers = bootstrapServers,
GroupId = groupName
}).SetKeyDeserializer(new AvroDeserializer < string > (schemaRegistry).AsSyncOverAsync()).SetValueDeserializer(new AvroDeserializer < GenericRecord > (schemaRegistry).AsSyncOverAsync()).SetErrorHandler((_, e) =>Console.WriteLine($ "Error: {e.Reason}")).Build()) {
consumer.Subscribe(topicName);
try {
while (true) {
try {
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($ "Key: {consumeResult.Message.Key}\nValue: {consumeResult.Value}");
}
catch(ConsumeException e) {
Console.WriteLine($ "Consume error: {e.Error.Reason}");
}
}
}
catch(OperationCanceledException) {
// commit final offsets and leave the group.
consumer.Close();
}
}
});
Run Code Online (Sandbox Code Playgroud)