编辑
万一其他人处于这种特殊情况,在调整消费者配置后,我得到了类似于我正在寻找的东西。我创建了一个生产者,将优先级消息发送到三个单独的主题(高/中/低优先级),然后我创建了 3 个单独的消费者来消费每个主题。然后我经常轮询较高优先级的主题,除非高为空,否则不会轮询较低优先级:
while(true) {
final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);
final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
if (!consumerRecordsHigh.isEmpty()) {
//process high pri records
} else {
final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
if (!consumerRecordsMed.isEmpty()) {
//process med pri records
Run Code Online (Sandbox Code Playgroud)
轮询超时(该.poll()方法的参数)确定在没有要轮询的记录时等待多长时间。我将每个主题的时间设置为非常短的时间,但是您可以将它设置为较低的优先级较低,以确保它不会在高优先级消息存在时消耗宝贵的周期等待
该max.poll.records配置显然决定的记录,以抢在一个民意调查的最大数量。对于更高的优先级,这也可以设置得更高。
该max.poll.interval.ms配置将确定轮询之间的时间-它应该需要多长时间来处理max.poll.records消息。澄清一下。
另外,我相信暂停/恢复整个消费者/主题可以这样实现:
kafkaConsumer.pause(kafkaConsumer.assignment())
if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
kafkaConsumer.resume(kafkaConsumer.assignment());
}
Run Code Online (Sandbox Code Playgroud)
我不确定这是否是最好的方法,但我在其他地方找不到很好的例子
我同意下面的 senseiwu,这并不是 Kafka 的正确用法。这是单线程处理,每个主题都有一个专门的使用者,但我将从这里开始改进这个过程。
背景
我们正在尝试改进我们的应用程序,并希望使用 Apache Kafka 在解耦组件之间进行消息传递。我们的系统通常是低带宽的(尽管在某些情况下带宽可能会暂时很高),并且具有必须在较大文件等待时处理的小而高优先级的消息,或者缓慢处理以消耗更少的带宽。我们希望有不同优先级的主题。
我是 Kafka 的新手,但尝试研究 Processor API 和 …