处理从 Kafka 检索到的每条记录后,提交的正确方法是什么?

Ogr*_*gre 5 java kotlin apache-kafka

我在理解如何为我使用的每条记录正确手动提交时遇到了一些麻烦。

首先,让我们看一个来自https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html的例子

while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }
Run Code Online (Sandbox Code Playgroud)

此示例仅在处理了轮询中收到的所有记录后才提交。我认为这不是一个很好的方法,因为如果我们收到三个记录,而我的服务在处理第二个记录时死亡,它最终将再次消耗第一条记录,这是不正确的。

因此,还有第二个示例涵盖了在每个分区的基础上提交记录:

try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }
Run Code Online (Sandbox Code Playgroud)

但是,我认为这遇到了同样的问题,它仅在处理来自特定分区的所有记录后才提交。

我设法想出的解决方案是:

        val consumer: Consumer<String, MyEvent> = createConsumer(bootstrap)
        consumer.subscribe(listOf("some-topic"))

        while (true) {
            val records: ConsumerRecords<String, MyEvent> = consumer.poll(Duration.ofSeconds(1))
            if (!records.isEmpty) {
                mainLogger.info("Received ${records.count()} events from CRS kafka topic, with partitions ${records.partitions()}")
                records.forEach {
                    mainLogger.debug("Record at offset ${it.offset()}, ${it.value()}")
                    processEvent(it.value()) // Complex event processing occurs in this function
                    consumer.commitSync(mapOf(TopicPartition(it.topic(), it.partition()) to OffsetAndMetadata (it.offset() + 1)))
                }
            }
        }
Run Code Online (Sandbox Code Playgroud)

现在这在我测试时似乎有效。到目前为止,在我的测试过程中,似乎只使用了一个分区(我已经通过记录 record.partitions() 检查了这一点)。

这种方法会导致任何问题吗?消费者 API 似乎没有提供一种在不指定分区的情况下提交偏移量的方法,这对我来说似乎有点奇怪。我在这里错过了什么吗?

Mic*_*son 3

承诺的方式没有正确或错误之分。这实际上取决于您的用例和应用程序。

提交每个偏移量可以提供更精细的控制,但它会对性能产生影响。另一方面,您可以每 X 秒异步提交一次(就像自动提交一样),并且开销很小,但控制却少得多。


在第一个示例中,事件被批量处理和提交。就性能而言,这很有趣,但如果出现错误,可以重新处理整个批次。

在第二个示例中,它也是批处理,但仅限于每个分区。这应该会导致批次更小,从而降低性能,但在出现问题时会减少重新处理。

在最后一个示例中,您选择提交每条消息。虽然这提供了最大程度的控制,但它会显着影响性能。此外,与其他情况一样,它并不完全防错。

如果应用程序在处理事件之后但在提交之前崩溃,则在重新启动时,最后一个事件可能会被重新处理(即至少一次语义)。但至少,只有一个事件会受到影响。

如果您想要恰好一次语义,则需要使用Transactional Producer