带有Kafka的Kafka的死信队列(DLQ)

Evg*_*yst 5 spring-integration dead-letter apache-kafka spring-boot spring-kafka

使用spring-kafka 2.1.x在Spring Boot 2.0应用程序中实现死信队列(DLQ)概念的最佳方法是,将某个bean 的@KafkaListener方法无法处理的所有消息发送到某个预定义的Kafka DLQ主题不会丢失一条消息?

因此,消耗的卡夫卡记录是:

  1. 成功处理,
  2. 处理失败并发送到DLQ主题,
  3. 未能处理,未发送到DLQ主题(由于意外问题),因此监听器将再次使用它。

我尝试使用ErrorHandler的自定义实现创建侦听器容器,从而无法使用KafkaTemplate将记录处理为DLQ主题。使用禁用的自动提交和RECORD AckMode。

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}
Run Code Online (Sandbox Code Playgroud)

似乎此实现不能保证第3项。如果将在DlqErrorHandler中引发异常,则记录将不会再次由侦听器使用。

使用事务侦听器容器是否有帮助?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
Run Code Online (Sandbox Code Playgroud)

是否有使用Spring Kafka实施DLQ概念的便捷方法?

更新28/03/2018

多亏了加里·罗素(Gary Russell)的回答,我能够通过如下实现DlqErrorHandler来实现所需的行为

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这样,如果消费者调查返回3条记录(1、2、3),而第二条记录将无法处理:

  • 1将被处理
  • 2将无法处理并发送到DLQ
  • 3感谢消费者寻求record.offset()+ 1,它将被传递给侦听器

如果发送给DLQ失败,则消费者寻求到record.offset(),并且记录将重新传递给侦听器(发送给DLQ的消息可能会被淘汰)。

Gar*_*ell 5

SeekToCurrentErrorHandler

当发生异常时,它会寻找消费者,以便在下一次轮询时重新传递所有未处理的记录。

You can use the same technique (e.g. a subclass) to write to the DLQ and seek the current offset (and other unprocessed) if the DLQ write fails, and seek just the remaining records if the DLQ write succeeds.

  • 此外,在答案发布后不久就添加了“DeadLetterPublishingRecoverer”。非阻塞重试并不适合所有情况 - 例如,当需要严格的记录顺序处理时。 (2认同)