Redis - 使用BRPOPLPUSH时更好的清理处理队列(可靠)的方法

asp*_*pak 18 queue reliability redis

我们当前的设计

Env Redis 2.8.17

我们使用类似于redis文档中描述的模式在RPOPLPUSH下实现了可靠的队列

但是,我们使用BRPOPLPUSH考虑其阻塞性质,并使用LPUSH来确保FIFO顺序.

生产者:使用LPUSH推送项目的多个线程(来自多个服务器).

消费者:使用BRPOPLPUSH处理项目的多个线程(来自多个服务器).

BRPOPLPUSH q processing-q
Run Code Online (Sandbox Code Playgroud)

如记录所示,redis从队列'q'弹出项目,同时在'processing-q'中添加它们.

问题

由于我们的应用程序具有多线程(异步)特性,因此当消费者完成处理时,我们无法控制.

因此,如果我们使用LREM(根据文档)从processing-q中删除已处理的元素,这将只删除处理q的顶部元素.在哪里无法保证,是否已经删除了由相应消费者处理的实际元素.

因此,如果我们不做任何事情,处理-q继续增长(吃掉记忆),这是非常糟糕的恕我直言.

有什么建议或想法吗?

per*_*hap 22

您只需要在调用LREM时包含要删除的作业.

LREM采用以下形式:

LREM queue count "object"
Run Code Online (Sandbox Code Playgroud)

它将从队列中删除等于"对象"的计数项.因此,要删除您的消费者线程正在处理的特定作业,您可以执行此类操作.

LREM processing-q 1 "job_identifier"
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅此处的文档:http://redis.io/commands/lrem

然后,为了处理崩溃的使用者和已放弃的作业,您可以使用SETEX创建具有过期的锁定,并定期检查没有锁定的作业.

所以整个过程看起来像这样:

制片人

  1. RPUSH q "job_identifier"

消费者

  1. SETEX lock:processing-q:job_identifier 60 (首先设置锁定以避免竞争条件)
  2. BRPOPLPUSH q processing-queue
  3. 处理工作
  4. LREM processing-queue "job_identifier"

已过期的Jobs Monitor

  1. 工作= LRANGE processing-queue 0 -1
  2. foreach job in jobs:lock = GET lock:processing-q:job_identifier
  3. 如果lock为null,则此作业超时,因此从processing-q中删除 LREM processing-queue "job_identifier"
  4. 然后重试 RPUSH q "job_identifier"

@NotAUser发布了一个开源java实现,在这里:https://github.com/graknlabs/redisq/tree/master/src/main/java/ai/grakn/redisq

  • 我想我错过了一些明显的东西,但是如何在获得工作ID之前使用工作ID设置锁? (3认同)
  • 我不知道我现在是不是傻了,但我还是想不明白。使用“RPUSH”的生产者添加带有“GENERATED_ID”的字符串和作业内容。然后消费者使用完整的作业字符串或仅使用“GENERATED_ID”(由生产者)运行“SETEX”,但是消费者如何在首先轮询“q”之前发出该命令?在这个队列机制之外,“生产者”和“消费者”之间是否还有其他通信? (2认同)

Ita*_*ber 11

我将采用的方法是使用每个消费者的处理-q(例如,processing-q:consumer-id).这将解决您当前的问题,但您仍然需要以某种方式处理崩溃的消费者.为此,我建议您保留每个消费者最后一次弹出任务并定期检查超时.如果使用者已达到超时,则将其任务移回主队列并删除其队列.