Apache Kafka:正好在0.10版本中

Him*_*dar 2 apache-kafka kafka-consumer-api

为了实现Kafka消费者对消息的一次性处理,我一次只提交一条消息,如下所示

public void commitOneRecordConsumer(long seconds) {
        KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();

        try {

            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                try {
                    for (ConsumerRecord<String, String> record : records) {

                        processingService.process(record);

                        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));

                        System.out.println("Committed Offset" + ": " + record.offset());

                    }
                } catch (CommitFailedException e) {
                    // application specific failure handling
                }
            }
        } finally {
            consumer.close();
        }
    }
Run Code Online (Sandbox Code Playgroud)

上面的代码将消息的处理异步委托给下面的另一个类.

@Service
public class ProcessingService {

    @Async
    public void process(ConsumerRecord<String, String> record) throws InterruptedException {
        Thread.sleep(5000L);
        Map<String, Object> map = new HashMap<>();
        map.put("partition", record.partition());
        map.put("offset", record.offset());
        map.put("value", record.value());
        System.out.println("Processed" + ": " + map);
    }

}
Run Code Online (Sandbox Code Playgroud)

但是,这仍然不能保证一次交付,因为如果处理失败,它可能仍然提交其他消息,并且以前的消息将永远不会被处理和提交,这里有什么选择?

Mat*_*Sax 8

0.10.2及更早版本的原始答案(0.11及更高版本见答案)

目前,Kafka无法提供开箱即用的一次性处理.如果在成功处理消息后提交消息,则可以进行至少一次处理,或者如果poll()在开始处理之前直接提交消息,则可以进行最多一次处理.

(另见http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-commits中的"交付保证"一节)

但是,如果您的处理是幂等的,那么至少一次保证"足够好",即即使您处理两次记录,最终结果也是相同的.幂等处理的示例是将消息添加到键值存储.即使您添加相同的记录两次,第二个插入将只替换第一个当前键值对,KV存储仍将包含正确的数据.

在上面的示例代码中,您更新了a HashMap,这将是幂等操作.即使您在失败的情况下可能具有不一致的状态,例如put在崩溃之前仅执行两次调用.但是,这种不一致状态将在再次处理同一记录时得到修复.

调用println()不是幂等的,因为这是一个具有"副作用"的操作.但我想打印仅用于调试目的.

作为替代方案,您需要在用户代码中实现事务语义,这需要在发生故障时"撤消"(部分执行)操作.一般来说,这是一个难题.

Apache Kafka 0.11+的更新(对于0.11之前的版本,请参阅上面的答案)

从0.11开始,Apache Kafka支持使用Kafka Streams的幂等生成器,事务生成器和完全一次处理.它还"read_committed"向使用者添加一种模式,仅读取已提交的消息(以及删除/过滤中止的消息).