从卡夫卡多次读同一条消息

Ali*_*der 4 java spring apache-kafka kafka-consumer-api spring-kafka

我使用Spring Kafka API实现Kafka使用者手动偏移管理:

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}
Run Code Online (Sandbox Code Playgroud)

在这里,我希望消费者只有在someCondition持有时才提交偏移量.否则,消费者应该睡一段时间并再次阅读相同的消息.

卡夫卡配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}
Run Code Online (Sandbox Code Playgroud)

使用当前配置,如果someCondition == false,consumer不提交偏移量,但仍然读取下一条消息.如果acknowledgement没有执行Kafka,有没有办法让消费者重读一条消息?

Arj*_*jit 7

正如@Gary已经指出的那样,你是在正确的方向,seek()是这样做的方式.当我遇到这个问题时,我今天找不到它的代码示例.这是任何想要解决问题的人的代码.

public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {

    private ConsumerSeekCallback consumerSeekCallback;


    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {

        if (/*some condition*/) {
            //process
            acknowledgment.acknowledge(); //send ack
        } else {

            consumerSeekCallback.seek("your.topic", record.partition(), record.offset());

        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
        this.consumerSeekCallback = consumerSeekCallback;
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

        // nothing is needed here for this program
    }

}
Run Code Online (Sandbox Code Playgroud)


Gar*_*ell 4

您可以停止并重新启动容器,它将重新发送。

在即将发布的 1.1 版本中,您可以寻求所需的偏移量,并且它将重新发送。

但是,如果已经检索到后来的消息,您仍然会首先看到它们,因此您也必须丢弃这些消息。

第二个里程碑具有该功能,我们预计它将在下周发布。