Kafka、Spring Kafka 和重新传递旧消息

ale*_*oid 3 apache-kafka spring-boot spring-kafka

我将 Kafka 和 Spring Boot 与 Spring Kafka 一起使用。在应用程序异常终止然后重新启动后,我的应用程序开始从 Kafka 队列接收旧的、已经处理的消息。

这可能是什么原因以及如何找到和解决问题?

我的卡夫卡属性:

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
Run Code Online (Sandbox Code Playgroud)

我的 Spring Kafka 工厂和听众:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

    return factory;
}

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    Post post = consumerRecord.value();

    // do some logic here

    ack.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)

Ale*_*eld 7

使用 Kafka 时,客户端需要自己提交偏移量。这与其他消息代理形成对比,例如 AMQP 代理,其中代理跟踪客户端已经收到的消息。

在您的情况下,您不会自动提交偏移量,因此 Kafka 希望您手动提交它们(因为此设置:)spring.kafka.consumer.enable-auto-commit=false。如果您没有在程序中手动提交偏移量,则您描述的行为几乎是预期的行为。Kafka 根本不知道您的程序成功处理了哪些消息。每次重新启动程序时,Kafka 都会看到您的程序尚未提交任何偏移量,并将应用您在 中提供的策略spring.kafka.consumer.auto-offset-reset=earliest,这意味着队列中的第一条消息。

如果这是所有新的给你,我建议读了卡夫卡和文档Spring文档,因为卡夫卡比其他消息代理很大的不同。

  • 如果你设置了`AckMode.RECORD`,当监听器正常返回时,容器会为你提交偏移量;默认的 AckMode 是 BATCH,这意味着在处理完轮询的所有结果后将提交偏移量。这减少了网络活动,但可能意味着在失败后可能会重复交付。`AckMode.MANUAL` 类似地在批处理结束时进行提交。对于最新版本的 spring-kafka (&gt;= 1.3),`AckMode.MANUAL_IMMEDIATE` 意味着 `acknowledge()` 将导致立即提交偏移量。 (2认同)

归档时间:

查看次数:

5011 次

最近记录:

7 年,2 月 前