小编kiw*_*ski的帖子

有没有办法在 Apache Kafka 2.0 中对消息进行优先级排序?

编辑

万一其他人处于这种特殊情况,在调整消费者配置后,我得到了类似于我正在寻找的东西。我创建了一个生产者,将优先级消息发送到三个单独的主题(高/中/低优先级),然后我创建了 3 个单独的消费者来消费每个主题。然后我经常轮询较高优先级的主题,除非高为空,否则不会轮询较低优先级:

    while(true) {
        final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
        final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);

        final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
        if (!consumerRecordsHigh.isEmpty()) {
            //process high pri records
        } else {
            final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
            if (!consumerRecordsMed.isEmpty()) {
                //process med pri records
Run Code Online (Sandbox Code Playgroud)

轮询超时(该.poll()方法的参数)确定在没有要轮询的记录时等待多长时间。我将每个主题的时间设置为非常短的时间,但是您可以将它设置为较低的优先级较低,以确保它不会在高优先级消息存在时消耗宝贵的周期等待

max.poll.records配置显然决定的记录,以抢在一个民意调查的最大数量。对于更高的优先级,这也可以设置得更高。

max.poll.interval.ms配置将确定轮询之间的时间-它应该需要多长时间来处理max.poll.records消息。澄清一下

另外,我相信暂停/恢复整个消费者/主题可以这样实现:

    kafkaConsumer.pause(kafkaConsumer.assignment())
    if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }
Run Code Online (Sandbox Code Playgroud)

我不确定这是否是最好的方法,但我在其他地方找不到很好的例子

我同意下面的 senseiwu,这并不是 Kafka 的正确用法。这是单线程处理,每个主题都有一个专门的使用者,但我将从这里开始改进这个过程。


背景

我们正在尝试改进我们的应用程序,并希望使用 Apache Kafka 在解耦组件之间进行消息传递。我们的系统通常是低带宽的(尽管在某些情况下带宽可能会暂时很高),并且具有必须在较大文件等待时处理的小而高优先级的消息,或者缓慢处理以消耗更少的带宽。我们希望有不同优先级的主题。

我是 Kafka 的新手,但尝试研究 Processor API 和 …

priority-queue apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
1833
查看次数