使用KafkaAvroDecoder将Avro消息反序列化为特定数据

kos*_*sii 3 java avro apache-kafka

我正在读一个Kafka主题,其中包含使用KafkaAvroEncoder(使用主题自动注册模式)序列化的Avro消息.我正在使用maven-avro-plugin生成普通的Java类,我想在阅读时使用它.

KafkaAvroDecoder只支持反序列化到GenericData.Record类型,(在我看来)错过具有静态类型语言的整点.我的反序列化代码目前看起来像这样:

    SpecificDatumReader<event> reader = new SpecificDatumReader<>(
        event.getClassSchema() // event is my class generated from the schema
    );
    byte[] in = ...; // my input bytes;
    ByteBuffer stuff = ByteBuffer.wrap(in);
    // the KafkaAvroEncoder puts a magic byte and the ID of the schema (as stored 
    //   in the schema-registry) before the serialized message
    if (stuff.get() != 0x0) {
        return;
    }
    int id = stuff.getInt();

    // lets just ignore those special bytes
    int length = stuff.limit() - 4 - 1;
    int start = stuff.position() + stuff.arrayOffset();

    Decoder decoder = DecoderFactory.get().binaryDecoder(
        stuff.array(), start, length, null
    );
    try {
        event ev = reader.read(null, decoder);
    } catch (IOException e) {
        e.printStackTrace();
    }
Run Code Online (Sandbox Code Playgroud)

我发现我的解决方案很麻烦,所以我想知道是否有更简单的解决方案来做到这一点.

kos*_*sii 6

由于评论,我能够找到答案.秘诀是KafkaAvroDecoder通过Properties指定特定Avro阅读器的使用来实例化,即:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.KafkaAvroSerializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C?ONFIG, "...");
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    VerifiableProp vProps = new VerifiableProperties(props);

    KafkaAvroDecoder decoder = new KafkaAvroDecoder(vProps);
    MyLittleData data = (MyLittleData) decoder.fromBytes(input);
Run Code Online (Sandbox Code Playgroud)

相同的配置适用于直接使用KafkaConsumer<K, V>类的情况(我使用KafkaSpout来自storm-kafka项目的Storm中的Kafka消耗,使用SimpleConsumer,所以我必须手动反序列化消息.对于勇敢的风暴来说-kafka-client项目,它通过使用新样式消费者自动执行此操作.