小编sci*_*nds的帖子

KafkaAvroSerializer用于在没有schema.registry.url的情况下序列化Avro

我是Kafka和Avro的菜鸟.所以我一直试图让Producer/Consumer运行起来.到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串:生产者的配置:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    }
    producer.close();
}
Run Code Online (Sandbox Code Playgroud)

现在这一切都很好,当我尝试序列化POJO时问题就出现了.因此,我能够使用Avro提供的实用程序从POJO获取AvroSchema.对模式进行硬编码,然后尝试创建通用记录以通过KafkaProducer发送,生成器现在设置为:

    Properties props = new …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka confluent-schema-registry

7
推荐指数
1
解决办法
1万
查看次数