use*_*140 11 apache-kafka spring-kafka
我第一次使用Spring Kafka而且我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交.如果我的消费者配置或监听器代码中缺少任何内容,请告诉我.或者是否有其他方法根据条件处理确认偏移.在这里,我正在寻找解决方案,如果没有手动提交/确认偏移,它应该由消费者选择相同的消息/偏移.
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
@EnableKafka
@Configuration
public class ConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.groupId}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
props));
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
}
------------------------
private static int value = 1;
@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment acknowledgment) {
if (value%2==0){
acknowledgment.acknowledge();
}
value++;
}
Run Code Online (Sandbox Code Playgroud)
con*_*nny 19
将enable-auto-commit属性设置为false:
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
将ack-mode设置为MANUAL_IMMEDIATE:
.factory.getContainerProperties()setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
然后,在您的使用者/侦听器代码中,您可以手动提交偏移量,如下所示:
@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,
Acknowledgment acknowledgment) {
System.out.println("Received message: ");
System.out.println(consumerRecord.value().toString());
acknowledgment.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)
更新:我为此创建了一个小POC.看看这里,可能会帮助你.
小智 8
您需要执行以下操作
1) 将 enable-auto-commit 属性设置为 false
consumerConfigProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Run Code Online (Sandbox Code Playgroud)
2)设置ACK模式为MANUL_IMMEDIATE
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
Run Code Online (Sandbox Code Playgroud)
3)对于处理过的记录,你需要调用acknowledge.acknowledge();
4)对于失败的记录调用acknowledgement.nack(10);注意:nack 方法需要一个很长的参数,即睡眠时间,它应该小于 max.poll.interval.ms
下面是一个示例代码
@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment acknowledgment) {
if (value%2==0){
acknowledgment.acknowledge();
} else {
acknowledgment.nack(10); //sleep time should be less than max.poll.interval.ms
}
value++;
}
Run Code Online (Sandbox Code Playgroud)
但在 Apache Kafka 中则不然。
对于当前正在运行的消费者,我们可能永远不会担心提交偏移量。我们只需要为同一消费者群体中的新消费者保留它们。当前的跟踪其在内存中的偏移量。我猜是在《经纪人》的某个地方。
如果您需要在同一消费者中重新获取相同的消息(可能是下一轮轮询),您应该考虑使用seek()功能:https ://docs.spring.io/spring-kafka/docs/2.0.1.RELEASE/reference/html /_reference.html#seek
| 归档时间: |
|
| 查看次数: |
15181 次 |
| 最近记录: |