sci*_*nds 7 java avro apache-kafka confluent-schema-registry
我是Kafka和Avro的菜鸟.所以我一直试图让Producer/Consumer运行起来.到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串:生产者的配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
Run Code Online (Sandbox Code Playgroud)
现在这一切都很好,当我尝试序列化POJO时问题就出现了.因此,我能够使用Avro提供的实用程序从POJO获取AvroSchema.对模式进行硬编码,然后尝试创建通用记录以通过KafkaProducer发送,生成器现在设置为:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)
这就是问题所在:当我使用KafkaAvroSerializer时,生产者没有出现由于: 缺少必需参数:schema.registry.url
我读到了为什么需要这样做,以便我的消费者能够破译生产者发给我的任何东西.但是,AvroMessage中是否已嵌入了架构?如果有人可以与KafkaAvroSerializer共享一个使用KafkaProducer的工作示例,而不必指定schema.registry.url,那将会非常棒.
也非常感谢关于模式注册表实用程序的任何见解/资源.
谢谢!
Tre*_*iac 21
首先注意:KafkaAvroSerializer
vanilla apache kafka中没有提供 - 它由Confluent Platform提供.(https://www.confluent.io/),作为其开源组件的一部分(http://docs.confluent.io/current/platform.html#confluent-schema-registry)
快速回答:不,如果您使用KafkaAvroSerializer
,则需要架构注册表.在这里查看一些示例:http:
//docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
架构注册表的基本思想是每个主题都将引用一个avro架构(即,您只能发送彼此一致的数据.但架构可以有多个版本,因此您仍需要为每个架构识别架构记录)
我们不想像你暗示的那样为每个数据编写模式 - 通常,模式比你的数据更大!每次阅读时都会浪费时间解析它,浪费资源(网络,磁盘,cpu)
相反,模式注册表实例将执行绑定avro schema <-> int schemaId
,然后序列化程序将在从注册表获取数据之后仅在数据之前写入此ID(并将其缓存以供以后使用).
所以在kafka中,你的记录将是[<id> <bytesavro>]
(和技术原因的魔术字节),这是一个只有5个字节的开销(与你的模式的大小比较)并且在阅读时,你的消费者会找到相应的模式到id,和deserializer关于它的avro字节.你可以在汇合的doc中找到更多的方法
如果您真的想要为每条记录编写模式,那么您将需要一个其他的序列化程序(我认为编写自己的序列化程序,但它很容易,只需重用https://github.com/confluentinc/schema- registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java并删除模式注册表部分以将其替换为模式,同样用于读取).但是如果你使用avro,我真的会劝阻这一点 - 一天之后,你需要实现像avro注册表这样的东西来管理版本控制
归档时间: |
|
查看次数: |
10954 次 |
最近记录: |