我可以在不检索所有消息的情况下检索Kafka分区的最新可用偏移吗?

Ale*_*son 6 apache-kafka kafka-consumer-api

查看最新的(v0.10)Kafka Consumer 文档:

" 消费者的位置给出了下一条记录的偏移量.它将大于消费者在该分区中看到的最高偏移量.它会在每次消费者接收数据呼叫时自动前进(长)并收到消息."

有没有办法查询服务器端分区可用的最大偏移量,而不检索所有消息?

我试图实现的逻辑如下:

  1. 每秒查询主题中未决消息的数量(A)
  2. 如果A>阈值,则唤醒将继续检索所有消息并处理它们的处理器
  3. 否则什么都不做(睡1)

我的动机是我需要进行一些批处理,但我希望处理器只在有足够的数据时唤醒(而且我不想两次检索所有数据).

Ewe*_*ava 7

您可以使用该Consumer.seekToEnd()方法运行Consumer.poll(0)以使其生效但立即返回,然后Consumer.position()查找所有已订阅(或已分配)主题分区的位置.这些将是所有分区的当前最终偏移量.这也将开始从代理中获取这些偏移量的一些数据,但如果您随后寻求返回其他位置,则将忽略任何返回的数据.

目前serejja提到的另一种方法是使用旧的简单消费者,尽管这个过程要复杂得多,因为你需要手动找到每个分区的领导者.