Kafka是否支持主题或消息的优先级?

avi*_*ned 6 apache-kafka

我正在探索Kafka是否支持任何队列或消息处理的优先级这一事实.

它似乎不支持任何这样的事情.我用Google搜索并发现了这个支持此邮件的邮件存档:http: //mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOeJiJhVHsr=d6aSTihPsqWVg6vK5xYLam6yMDcd6UAUoXf-DQ@mail.gmail.com%3E

这里有没有人配置Kafka来优先处理任何主题或消息?

Sky*_*Sky 29

Kafka通过其设计,分区和复制的提交日志服务,本质上是一种快速,可扩展的分布式.因此,主题或消息没有优先权.

我也面临同样的问题.解决方案非常简单.在kafka队列中创建主题,让我们说:

1)high_priority_queue

2)medium_priority_queue

3)low_priority_queue

在medium_priority_queue中的high_priority_queue和中优先级消息中发布高优先级消息.

现在,您可以为所有主题创建kafka使用者和开放流.

  val props = new Properties()
  props.put("group.id", groupId)
  props.put("zookeeper.connect", zookeeperConnect)
  val config = new ConsumerConfig(props)
  val connector = Consumer.create(config)
  val topicWithStreamCount = Map(
       "high_priority_queue" -> 1,"medium_priority_queue" ->  1,"low_priority_queue" -> 1)
  val streamsMap = connector.createMessageStreams(topicWithStreamCount)
//this is scala code 
Run Code Online (Sandbox Code Playgroud)

你得到每个主题的流.如果主题没有任何消息,那么你可以先读取high_priority主题,然后回溯到medium_priority_queue主题.如果medium_priority_queue为空,则读取low_priority队列.

这个技巧对我来说很好.可能对你有帮助!!


小智 7

解决方案是根据优先级创建 3 个不同的主题。

  • 高优先级主题
  • 中优先级主题
  • 低优先级主题

根据一般经验,高优先级主题的消费者数量 > 中优先级主题的消费者数量 > 低优先级主题的消费者数量

这样,将保证到达高优先级主题的消息比低优先级主题更快地得到处理。


mik*_*ike 5

Confluent 有一篇关于在 Apache Kafka 中实现消息优先级的博客,其中描述了如何实现消息优先级

首先,重要的是要了解 Kafka 的设计不允许对消息进行优先级排序的开箱即用解决方案。主要原因是:

  • 存储:Kafka 被设计为仅附加提交日志,其中包含反映现实生活事件及时发生的不可变消息。
  • 消费者:Kafka 主题中的消息可以被多个消费者同时消费。每个消费者可能有不同的优先级,这使得无法提前对主题内的消息进行排序。

建议的解决方案是使用GitHub 上提供的Bucket Priority Pattern,可以用 README 中的图表最好地描述。您可以通过自定义生产者的分区器和消费者的分配策略来使用具有多个分区的单个主题,而不是为不同的优先级使用多个主题。

根据消息键,生产者将消息写入正确的优先级桶:

在此处输入图片说明

另一方面,消费者组将自定义其分配策略,并优先从具有最高分区的分区读取消息:

在此处输入图片说明

在您的客户端代码(生产者和消费者)中,您需要启动并调整以下客户端配置。

# Producer
configs.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,
   BucketPriorityPartitioner.class.getName());
configs.setProperty(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.setProperty(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");

# Consumer
configs.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
   BucketPriorityAssignor.class.getName());
configs.setProperty(BucketPriorityConfig.BUCKETS_CONFIG, "Platinum, Gold");
configs.setProperty(BucketPriorityConfig.ALLOCATION_CONFIG, "70%, 30%");

Run Code Online (Sandbox Code Playgroud)