Spring Kafka Consumer 如何跳过 Avro Deserializer 异常

ttt*_*ttt 5 java spring apache-kafka spring-kafka

我正在使用 Spring Kafka 消费者和 Avro 架构来构建我的应用程序。

但是,如果消息无法反序列化到我构建的指定 Avro 特定记录,则消费者将不断地一遍又一遍地重试消费相同的消息(无限重试)。

对于这种情况,如果我的消费者发生反序列化器异常,如何配置消费者应用程序以跳过当前消息并移至下一个偏移量。

我查看了 Spring Kafka 错误句柄,它只能处理侦听器中的异常,而不能处理反序列化阶段的异常。

我的消费者应用程序非常简单:

@KafkaListener(id = "demo-consumer-stream-group", topics = "customer-output-")
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
  }
Run Code Online (Sandbox Code Playgroud)

基于此代码,有时接收到的消息可能无法反序列化为正确的Customer对象。

另外,我看到最近的解决方案是使用ErrorHandlingDeserializer2Spring Kafka 来处理这个问题,但是由于我正在使用KafkaAvroDeserializer如何计算出这些配置?我当前的配置是:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
Run Code Online (Sandbox Code Playgroud)

Gar*_*ell 0

文档中对此进行了解释。

您可以通过自定义 Spring 属性将反序列化器设置为错误处理反序列化器及其委托。

您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数采用键和值 Deserializer 对象,并连接到您已使用适当委托配置的适当 ErrorHandlingDeserializer2 实例中。或者,您可以使用使用者配置属性(由 ErrorHandlingDeserializer 使用)来实例化委托。属性名称为 ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例展示了如何设置这些属性:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
Run Code Online (Sandbox Code Playgroud)