小编mde*_*dev的帖子

Kafka:轮询期间的反序列化问题

几天以来,我一直在玩融合版本的 kafka,以更好地了解平台。对于发送到一个主题的某些格式错误的 avro 消息,我收到了一些序列化异常。让我用事实来解释这个问题:

<kafka.new.version>0.10.2.0-cp1</kafka.new.version>
<confluent.version>3.2.0</confluent.version>
<avro.version>1.7.7</avro.version>
Run Code Online (Sandbox Code Playgroud)

意图:非常简单,Producer 发送 Avro 记录,Consumer 应该毫无问题地消费所有记录,(它可以使所有消息与架构注册表中的架构不兼容。)用法:

Producer -> 
Key -> StringSerializer
Value -> KafkaAvroSerializer

Consumer ->
Key -> StringDeserializer
Value -> KafkaAvroDeserializer
Run Code Online (Sandbox Code Playgroud)

其他消费者属性(仅供参考):

    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "somehost:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myconsumer-4");
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "someclient-4");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    properties.put("schema.registry.url", "schemaregistryhost:8081");
Run Code Online (Sandbox Code Playgroud)

我能够毫无问题地使用消息,直到其他一些生产者错误地向该主题发送了一条消息并修改了架构注册表中的最新架构。(我们在架构注册表中启用了一个选项,因此您可以向主题发送任何消息,架构注册表每次都会创建一个新版本的架构,如果关闭,我们也可以关闭。)

现在,由于这一个坏消息,则poll()是序列化问题失败。它确实给了我失败的偏移量,我可以通过使用 seek() 传递偏移量,但这听起来不太好。我还尝试使用最大轮询记录为 10 并将 poll() 超时设置为非常小,以便我可以通过捕获异常来忽略最多 10 条记录,但由于某种原因,max-records 不起作用并且代码立即失败并出现序列化错误,即使我从开始和坏消息在 240 偏移处。

properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
Run Code Online (Sandbox Code Playgroud)

另一个简单的解决方案是在我的应用程序中使用 ByteArrayDeserializer 并使用 KafkaAvroDecoder,我可以处理反序列化问题。

我相信我缺少某些东西或做错了。也添加例外:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic.ongo.test3.user14-0 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api confluent-platform

5
推荐指数
1
解决办法
5972
查看次数