Redis可靠的队列,用于多线程处理

Mop*_*ath 5 redis reliable-message-delivery

对于我正在进行的项目,我使用Redis来跨多个进程分发消息.现在,我应该让它们变得可靠.

我考虑通过BRPOPLPUSH命令使用Reliable队列模式.此模式表明处理线程在作业成功完成后,通过lrem命令从"处理列表"中删除了额外的消息副本.

当我使用多个线程弹出时,弹出项的额外副本将从多个线程进入处理列表.也就是说,处理队列包含由多个线程弹出的元素.因此,如果线程完成其作业,则无法知道要从"处理队列"中删除哪个项目.

为了克服这个问题,我想我应该基于threadId维护多个处理队列(每个线程一个).所以,我的BRPOPLPUSH将是:

BRPOPLPUSH <primary-queue> <thread-specific-processing-queue>
Run Code Online (Sandbox Code Playgroud)

然后,为了清理timedout对象,我的监视线程必须监视所有这些特定于线程的处理队列.

对于这个问题,有没有比上面提出的方法更好的方法?

Mop*_*ath 3

@用户779159

为了支持可靠的队列机制,我们采取以下方法:

 - two data structures
    -- Redis List (the original queue from which items are popped regularly)
    -- a Redis z-set, which temporarily stores the popped item.
Run Code Online (Sandbox Code Playgroud)

算法:

-- When an element is popped, we store in z-set 
-- If the task that picked the item completed its job, it will delete the entry from z-set.
-- If the task couldn't complete it, the item will be hanging around in z-set. So we know, whether a task was done within expected time or not.
-- Now, another background process periodically scans this z-set, picks up items which are timedout, and then puts them back to queue
Run Code Online (Sandbox Code Playgroud)

它是如何完成的:

  • 我们使用 zset 来存储我们弹出的项目(通常使用 lua 脚本)。
  • 我们存储一个超时值作为该项目的排名/分数。
  • 另一个扫描程序进程将定期(例如每分钟)运行 z-set 命令 zrangebyscore,以选择(现在和最后 1 分钟)之间的项目。
  • 如果上述命令找到了项目,则意味着弹出该项目(通过 brpop)的进程尚未及时完成其任务。
  • 因此,第二个进程会将项目放回到它原来所属的队列(redis 列表)中。