卡夫卡 - 获得最新抵消的最简单方法

use*_*128 7 apache-kafka

我正在构建一个应用程序,允许动态添加和删除对kafka主题的订阅.添加主题订阅时,我希望每小时运行一次批处理作业,获取所有新消息并将其推送到另一个数据存储区.

我想要了解的是如何获取主题的当前偏移量.一旦添加订阅,我希望下一个批处理作业从订阅的大致时间开始获取所有消息.

举个例子,假设我有一个名为"TopicA"的主题,它不断收到消息.如果我在下午7点15分添加订阅,当批处理作业在晚上8点运行时,我想要从7.15pm开始批量处理所有消息.我很高兴有时间近似 - 7.10,7.20等5或10分钟任何一方都让我无所顾忌.

所以我的预期解决方案是在添加订阅时获取主题的当前偏移量.我看过这个简单的消费者,但我不想参与这个基本用例的所有集群管理软件方面.

我也看过高级消费者.我可能会这样:

consumer.createMessageStreamsByFilter(new Whitelist(topicName)).head.head.offset
Run Code Online (Sandbox Code Playgroud)

我对这种方法的担忧是对"head"的调用实际上是一个流.所以我相信它会阻止等待下一条消息.阻塞是有问题的,因为它可能导致其他订阅排队,直到下一条消息到达.

我很乐意花一些时间来实现后一种方法,但是如果有一种更简单的方法不需要我编写容易出错的并发代码,那么我宁愿不浪费我的时间.

我还需要一种方法来获取自该偏移量以来的所有日志.

Dav*_*ley 3

对获取请求的每个响应都会返回一个“HighWaterMark”,它表示当前正在使用的分区日志中的最新偏移量。因此,理论上,您可以获取给定主题的最早消息或实际上任何消息(假设存在),并从响应中提取 HighWaterMark。有关 HighWaterMark 的更多详细信息,请访问: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse

当然,能否从响应中提取 HighWaterMarkOffset 取决于您的客户端通过其自己的 Kafka API 提供该数据。

  • 我认为不存在全球“最新消息”这样的东西。如果 Kafka 有某种全局同步机制,它就无法扩展...... (2认同)