Kafka 消费者 - 具有更高优先级的主题

mir*_*ran 3 apache-kafka kafka-consumer-api

我正在使用 Kafka Consumer 读取多个主题,我需要其中一个具有更高的优先级。处理需要很多时间,并且(低优先级)主题中总是有很多消息,但我需要尽快处理来自其他主题的消息。

这与Kafka是否支持主题或消息的优先级类似但这是使用旧 API。

在新的 API (0.10.1.1) 中,有方法

KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)
Run Code Online (Sandbox Code Playgroud)

但是我不清楚,如何有效地检测到高优先级主题中有新消息并且需要暂停其他主题的消费。

任何想法/例子?

mir*_*ran 7

最后我解决了这个问题,正如 dawsaw 所建议的那样 - 在处理循环中,我存储了我从中读取的所有主题/分区:

  • 开始偏移
  • 结束偏移
  • 承诺 - 我不能使用位置,因为我订阅了主题,而不是分区。

无论何时(endOffset - commited) > 0对于任何优先主题,我都会调用consumer.pause()非优先主题,并在(endOffset - commited) == 0所有优先主题之后再次恢复这些主题。