如何使用apache kafka实现延迟队列?

Mar*_*oni 2 priority-queue apache-kafka

如何在kafka上添加延迟作业?据我所知,它并不涉及每个消息,而是每个主题.我的工作有不同的时间表我希望他们被消费.假设一个将在接下来的4个小时内,另一个将是我12月1日等.

kafka是否有本地或其他第三方方式实现相同的支持?

我正在考虑将Redis用于延迟队列,并在其计划到达后将作业推送到kafka,但如果可能的话,我只想使用一个依赖项.

mju*_*rez 9

这里有点延迟的答案.现在,使用每条消息的新时间戳,最新的Kafka版本0.10+可以从延迟流中消耗.我现在正在使用它来实现连续聚合数据集,而不依赖于外部依赖性.

这些记录通过,并且可能在第一个事件发生后的60分钟内有更新/删除,因此在看到所有更新之前,我不能将其声明为"最终".

所以,为了处理这种情况,我正在使用所有CREATEs/UPDATEs/DELETE两次主题,第一个实时(或尽可能快),第二个延迟90分钟,以确保我不会错过任何东西.在实时消费者身上,我在本地存储了创建所需的所有更新.然后在延迟的消费者上,当我收到特定的"创建"时,我将查找我的本地存储以进行任何更新/删除,更新记录以便它知道它的最终状态,并再次将其生成到Kafka 的最终主题中.

为了确保我不会耗尽磁盘空间,我还不断截断本地存储,因此它最多可以保存两个小时的更新/删除.


Den*_*nko 5

卡夫卡没有工作的概念。它只是一个愚蠢的高性能消息队列服务。根据您的要求,您可以考虑将作业存储在支持按作业执行时间进行索引的存储中,例如RDBMS。然后在某些过程中,定期提取执行时间在[last_check_time,current_time + lookahead_interval]较小范围内的作业,并将其放入Kafka主题中以进行最终处理。