mir*_*ran 3 apache-kafka kafka-consumer-api
我正在使用 Kafka Consumer 读取多个主题,我需要其中一个具有更高的优先级。处理需要很多时间,并且(低优先级)主题中总是有很多消息,但我需要尽快处理来自其他主题的消息。
这与Kafka是否支持主题或消息的优先级类似?但这是使用旧 API。
在新的 API (0.10.1.1) 中,有方法
KafkaConsumer::pause(Collection)
KafkaConsumer::resume(Collection)
Run Code Online (Sandbox Code Playgroud)
但是我不清楚,如何有效地检测到高优先级主题中有新消息并且需要暂停其他主题的消费。
任何想法/例子?
最后我解决了这个问题,正如 dawsaw 所建议的那样 - 在处理循环中,我存储了我从中读取的所有主题/分区:
无论何时(endOffset - commited) > 0
对于任何优先主题,我都会调用consumer.pause()
非优先主题,并在(endOffset - commited) == 0
所有优先主题之后再次恢复这些主题。
归档时间: |
|
查看次数: |
5613 次 |
最近记录: |