如何确认spring kafka中的当前偏移量以进行手动提交

use*_*140 11 apache-kafka spring-kafka

我第一次使用Spring Kafka而且我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交.如果我的消费者配置或监听器代码中缺少任何内容,请告诉我.或者是否有其他方法根据条件处理确认偏移.在这里,我正在寻找解决方案,如果没有手动提交/确认偏移,它应该由消费者选择相同的消息/偏移.

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;

@EnableKafka
@Configuration
public class ConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Value(value = "${kafka.groupId}")
private String groupId;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
        props));
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
}
------------------------

private static int value = 1;

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment acknowledgment) {
if (value%2==0){
        acknowledgment.acknowledge();
}
value++;
}
Run Code Online (Sandbox Code Playgroud)

con*_*nny 19

将enable-auto-commit属性设置为false:

propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

将ack-mode设置为MANUAL_IMMEDIATE:

.factory.getContainerProperties()setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

然后,在您的使用者/侦听器代码中,您可以手动提交偏移量,如下所示:

@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,  
        Acknowledgment acknowledgment) {

    System.out.println("Received message: ");
    System.out.println(consumerRecord.value().toString());

    acknowledgment.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)

更新:我为此创建了一个小POC.看看这里,可能会帮助你.

  • 对于Spring 2.3.3,配置是factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); (4认同)

小智 8

您需要执行以下操作

1) 将 enable-auto-commit 属性设置为 false

consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Run Code Online (Sandbox Code Playgroud)

2)设置ACK模式为MANUL_IMMEDIATE

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
Run Code Online (Sandbox Code Playgroud)

3)对于处理过的记录,你需要调用acknowledge.acknowledge();

4)对于失败的记录调用acknowledgement.nack(10);注意:nack 方法需要一个很长的参数,即睡眠时间,它应该小于 max.poll.interval.ms

下面是一个示例代码

@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.OFFSET) int offsets,
        Acknowledgment acknowledgment) {

    if (value%2==0){
        acknowledgment.acknowledge();
    } else {
        acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
    }
    value++;
}
Run Code Online (Sandbox Code Playgroud)


Art*_*lan 2

但在 Apache Kafka 中则不然。

对于当前正在运行的消费者,我们可能永远不会担心提交偏移量。我们只需要为同一消费者群体中的新消费者保留它们。当前的跟踪其在内存中的偏移量。我猜是在《经纪人》的某个地方。

如果您需要在同一消费者中重新获取相同的消息(可能是下一轮轮询),您应该考虑使用seek()功能:https ://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html /_reference.html#seek