Mav*_*ick 3 spring kafka-consumer-api spring-kafka
我正在尝试使用 @KafkaListener 实现消费者。我使用的是Spring2.3.7版本。
这是到目前为止我的代码,
public class SampleListener {
@KafkaListener(topics = "test-topic",
containerFactory = "sampleKafkaListenerContainerFactory",
groupId = "test-group")
public void onMessage(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long receivedTimestamp,
@Header(KafkaHeaders.OFFSET) long offset,
@Headers MessageHeaders messageHeaders) {
LOGGER.info("Received Message for topic={} partition={} offset={} messageHeaders={}",
topic, partition, offset, messageHeaders);
LOGGER.debug("Received Message payload={}", message);
doSomething(message);
}
}
Run Code Online (Sandbox Code Playgroud)
我是卡夫卡和春天的新手。我阅读了有关如何寻求偏移量的 spring-kafka 文档,但无法完全理解。
据我了解,对于我的用例,当分区分配给容器或在任何其他场景中时,我不想再次读取事件(确保只读取一次)。
我看到大多数 Consumer 实现都实现了ConsumerSeekAware。我知道实施ConsumerSeekAware使我们能够寻求对诸如onIdleContainer或 之类的事件进行抵消onPartitionsAssigned。我无法理解这些正在处理什么场景?
ConsumerSeekAware实施来处理哪些场景?实现 Kafka Consumer 需要寻求抵消的最佳实践或一般场景是什么?
registerSeekCallback和 和有什么区别onPartitionsAssigned?对于两者来说,只要分配分区就会调用它们。这两种方法的callBack有什么区别?
实施ConsumerSeekAware可以让您
A。在初始化期间寻找特定的偏移量(或开始、结束或由时间戳表示的偏移量)。
b. Peform 在应用程序生命周期中的任何时间进行查找。
首选技术是AbstractConsumerSeekAware如果可能的话进行扩展,因为它可以解决大部分底层的复杂性。
如果不需要寻求,那么就不需要实现接口(或扩展抽象类)。
据我了解,对于我的用例,当分区分配给容器或在任何其他场景中时,我不想再次读取事件(确保只读取一次)。
容器将自动为您提交偏移量(默认情况下,当 a 返回所有记录时poll(),但您可以将容器AckMode属性设置为RECORD在处理每个记录后提交偏移量)。
下次启动应用程序时,它将从上次提交的偏移量开始消耗。
2.
onPartitionsAssigned在分配分区时(最初或重新平衡后)调用。如果您在那里执行搜索,它们会在重新平衡期间直接调用消费者。
registerSeekCallback被调用以为应用程序提供一个回调句柄,可以在将来的任意时间调用该回调。如果容器的并发度 > 1,则注册多个回调。当您对这些回调执行查找时,它们会排队等待消费者线程在下一次轮询之前调用。(消费者不是线程安全的)。抽象类为您管理这个并允许更高级别的抽象......
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
Run Code Online (Sandbox Code Playgroud)
在即将发布的 2.6.0 版本(本周发布)中,使用方法seekToBeginning(),它会更容易,seekToEnd()并且seekToTimeStamp()它将对所有分配的分区进行排队搜索。
| 归档时间: |
|
| 查看次数: |
7144 次 |
| 最近记录: |