kiw*_*ski 5 priority-queue apache-kafka apache-kafka-streams
编辑
万一其他人处于这种特殊情况,在调整消费者配置后,我得到了类似于我正在寻找的东西。我创建了一个生产者,将优先级消息发送到三个单独的主题(高/中/低优先级),然后我创建了 3 个单独的消费者来消费每个主题。然后我经常轮询较高优先级的主题,除非高为空,否则不会轮询较低优先级:
while(true) {
final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);
final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
if (!consumerRecordsHigh.isEmpty()) {
//process high pri records
} else {
final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
if (!consumerRecordsMed.isEmpty()) {
//process med pri records
Run Code Online (Sandbox Code Playgroud)
轮询超时(该.poll()方法的参数)确定在没有要轮询的记录时等待多长时间。我将每个主题的时间设置为非常短的时间,但是您可以将它设置为较低的优先级较低,以确保它不会在高优先级消息存在时消耗宝贵的周期等待
该max.poll.records配置显然决定的记录,以抢在一个民意调查的最大数量。对于更高的优先级,这也可以设置得更高。
该max.poll.interval.ms配置将确定轮询之间的时间-它应该需要多长时间来处理max.poll.records消息。澄清一下。
另外,我相信暂停/恢复整个消费者/主题可以这样实现:
kafkaConsumer.pause(kafkaConsumer.assignment())
if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
kafkaConsumer.resume(kafkaConsumer.assignment());
}
Run Code Online (Sandbox Code Playgroud)
我不确定这是否是最好的方法,但我在其他地方找不到很好的例子
我同意下面的 senseiwu,这并不是 Kafka 的正确用法。这是单线程处理,每个主题都有一个专门的使用者,但我将从这里开始改进这个过程。
背景
我们正在尝试改进我们的应用程序,并希望使用 Apache Kafka 在解耦组件之间进行消息传递。我们的系统通常是低带宽的(尽管在某些情况下带宽可能会暂时很高),并且具有必须在较大文件等待时处理的小而高优先级的消息,或者缓慢处理以消耗更少的带宽。我们希望有不同优先级的主题。
我是 Kafka 的新手,但尝试研究 Processor API 和 Kafka Streams 都没有成功,尽管论坛上的某些帖子似乎在说这是可行的。
处理器 API
当我尝试 时Processor API,我尝试KafkaConsumer通过检查是否poll()为空来确定高优先级当前是否正在处理任何内容,然后希望poll()与 Med Priority Consumer 一起处理,但第二个主题轮询返回空。此外,还似乎没有一种简单的方法来获取所有TopicPartition“,以S于一个话题来调用kafkaConsumer.pause(partitions)。
卡夫卡流
当我尝试时KafkaStreams,我设置了一个流以从我的每个“优先级”主题中使用,但是无法检查连接到更高优先级主题的KStreamorKafkaStreams实例当前是否处于空闲或正在处理中。
我的代码基于这个文件
其他
我还尝试了这里的代码:priority-kafka-client,但它没有按预期工作,因为运行下载的测试文件具有混合优先级。
我找到了这个线程,其中一位开发人员说(解决为主题添加优先级):“......用户可以通过暂停和恢复来实现这种行为”。但我无法弄清楚他是如何表示这可以奏效的。
我找到了这篇StackOverflow 文章,但他们似乎使用了一个非常旧的版本,我不清楚他们的映射函数应该如何工作。
结论
如果有人告诉我他们是否认为这是值得追求的事情,我将不胜感激。如果这不是 Apache Kafka 应该如何工作,因为它破坏了从自动主题/分区处理中获得的好处,那很好,我会在别处寻找。然而,有太多的例子人们似乎成功了,我想尝试一下。谢谢你。
这听起来像是应用程序中的一个设计问题 - kafka 最初被设计为提交日志,其中每条消息都以偏移量写入代理,并且各种消费者按照提交的顺序使用它们,延迟非常低,吞吐量很高。鉴于分区而不是主题是 Kafka 中工作分配的基本单元,因此很难在本地实现主题级别的优先级。
我建议调整您的设计以使用 Kafka 以外的其他架构组件,而不是尝试切割您的脚以适应鞋子。您已经可以做的一件事是让您的生产者将文件上传到适当的文件存储,并通过 Kafka 发送包含元数据的链接。然后,根据带宽状态,您的消费者可以根据大文件的元数据决定下载是否合理。这样,您可能更有可能拥有稳健的设计,而不是以错误的方式使用 Kafka。
如果您确实只想坚持使用 Kafka,一种解决方案是将大文件发送到某个固定数量的硬编码分区,并且消费者仅在带宽良好时才从这些分区进行消费。
| 归档时间: |
|
| 查看次数: |
1833 次 |
| 最近记录: |