asp*_*pak 18 queue reliability redis
Env Redis 2.8.17
我们使用类似于redis文档中描述的模式在RPOPLPUSH下实现了可靠的队列
但是,我们使用BRPOPLPUSH考虑其阻塞性质,并使用LPUSH来确保FIFO顺序.
生产者:使用LPUSH推送项目的多个线程(来自多个服务器).
消费者:使用BRPOPLPUSH处理项目的多个线程(来自多个服务器).
BRPOPLPUSH q processing-q
Run Code Online (Sandbox Code Playgroud)
如记录所示,redis从队列'q'弹出项目,同时在'processing-q'中添加它们.
由于我们的应用程序具有多线程(异步)特性,因此当消费者完成处理时,我们无法控制.
因此,如果我们使用LREM(根据文档)从processing-q中删除已处理的元素,这将只删除处理q的顶部元素.在哪里无法保证,是否已经删除了由相应消费者处理的实际元素.
因此,如果我们不做任何事情,处理-q继续增长(吃掉记忆),这是非常糟糕的恕我直言.
有什么建议或想法吗?
per*_*hap 22
您只需要在调用LREM时包含要删除的作业.
LREM采用以下形式:
LREM queue count "object"
Run Code Online (Sandbox Code Playgroud)
它将从队列中删除等于"对象"的计数项.因此,要删除您的消费者线程正在处理的特定作业,您可以执行此类操作.
LREM processing-q 1 "job_identifier"
Run Code Online (Sandbox Code Playgroud)
有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem
然后,为了处理崩溃的使用者和已放弃的作业,您可以使用SETEX创建具有过期的锁定,并定期检查没有锁定的作业.
所以整个过程看起来像这样:
制片人
RPUSH q "job_identifier"消费者
SETEX lock:processing-q:job_identifier 60 (首先设置锁定以避免竞争条件)BRPOPLPUSH q processing-queueLREM processing-queue "job_identifier"已过期的Jobs Monitor
LRANGE processing-queue 0 -1GET lock:processing-q:job_identifier LREM processing-queue "job_identifier"RPUSH q "job_identifier"@NotAUser发布了一个开源java实现,在这里:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq
Ita*_*ber 11
我将采用的方法是使用每个消费者的处理-q(例如,processing-q:consumer-id).这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者.为此,我建议您保留每个消费者最后一次弹出任务并定期检查超时.如果使用者已达到超时,则将其任务移回主队列并删除其队列.