Ogr*_*gre 5 java kotlin apache-kafka
我在理解如何为我使用的每条记录正确手动提交时遇到了一些麻烦。
首先,让我们看一个来自https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html的例子
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
Run Code Online (Sandbox Code Playgroud)
此示例仅在处理了轮询中收到的所有记录后才提交。我认为这不是一个很好的方法,因为如果我们收到三个记录,而我的服务在处理第二个记录时死亡,它最终将再次消耗第一条记录,这是不正确的。
因此,还有第二个示例涵盖了在每个分区的基础上提交记录:
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
Run Code Online (Sandbox Code Playgroud)
但是,我认为这遇到了同样的问题,它仅在处理来自特定分区的所有记录后才提交。
我设法想出的解决方案是:
val consumer: Consumer<String, MyEvent> = createConsumer(bootstrap)
consumer.subscribe(listOf("some-topic"))
while (true) {
val records: ConsumerRecords<String, MyEvent> = consumer.poll(Duration.ofSeconds(1))
if (!records.isEmpty) {
mainLogger.info("Received ${records.count()} events from CRS kafka topic, with partitions ${records.partitions()}")
records.forEach {
mainLogger.debug("Record at offset ${it.offset()}, ${it.value()}")
processEvent(it.value()) // Complex event processing occurs in this function
consumer.commitSync(mapOf(TopicPartition(it.topic(), it.partition()) to OffsetAndMetadata (it.offset() + 1)))
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在这在我测试时似乎有效。到目前为止,在我的测试过程中,似乎只使用了一个分区(我已经通过记录 record.partitions() 检查了这一点)。
这种方法会导致任何问题吗?消费者 API 似乎没有提供一种在不指定分区的情况下提交偏移量的方法,这对我来说似乎有点奇怪。我在这里错过了什么吗?
承诺的方式没有正确或错误之分。这实际上取决于您的用例和应用程序。
提交每个偏移量可以提供更精细的控制,但它会对性能产生影响。另一方面,您可以每 X 秒异步提交一次(就像自动提交一样),并且开销很小,但控制却少得多。
在第一个示例中,事件被批量处理和提交。就性能而言,这很有趣,但如果出现错误,可以重新处理整个批次。
在第二个示例中,它也是批处理,但仅限于每个分区。这应该会导致批次更小,从而降低性能,但在出现问题时会减少重新处理。
在最后一个示例中,您选择提交每条消息。虽然这提供了最大程度的控制,但它会显着影响性能。此外,与其他情况一样,它并不完全防错。
如果应用程序在处理事件之后但在提交之前崩溃,则在重新启动时,最后一个事件可能会被重新处理(即至少一次语义)。但至少,只有一个事件会受到影响。
如果您想要恰好一次语义,则需要使用Transactional Producer。
| 归档时间: |
|
| 查看次数: |
564 次 |
| 最近记录: |