Bull Queue并发问题

Bri*_*ian 6 javascript concurrency node.js bull.js

我需要帮助来了解Bull Queue(bull.js)如何处理并发作业。

假设我有10个Node.js实例,每个实例都实例化连接到同一Redis实例的Bull Queue:

const bullQueue = require('bull');
const queue = new bullQueue('taskqueue', {...})
const concurrency = 5;
queue.process('jobTypeA', concurrency, job => {...do something...});
Run Code Online (Sandbox Code Playgroud)

这是否意味着在所有10个节点实例中全局最多有5个(并发)类型的并发运行作业jobTypeA?还是我误会了,并发设置是每个节点的实例?

如果一个Node实例指定不同的并发值会怎样?

我可以确定作业不会被多个Node实例处理吗?

Sta*_*sky 6

TL;DR 是:在正常情况下,作业仅被处理一次。如果出现问题(比如 Node.js 进程崩溃),作业可能会被双重处理。

引用 Bull 的官方README.md

重要笔记

队列的目标是“至少一次”的工作策略。这意味着在某些情况下,一个作业可能会被处理多次。这主要发生在工人在整个处理过程中未能为给定作业保持锁定时。

当一个工作人员正在处理一项工作时,它将保持工作“锁定”,以便其他工作人员无法处理它。

了解锁定的工作原理很重要,以防止您的作业失去锁定 -停滞- 并因此重新启动。锁定是通过为lockDurationon 间隔lockRenewTime(通常是 half lockDuration)创建一个锁来在内部实现的。如果lockDuration在可以更新锁定之前过去了,则作业将被视为停止并自动重新启动;它将被双重处理。这可能发生在:

  1. 运行您的作业处理器的 Node 进程意外终止。
  2. 您的作业处理器过于占用 CPU 并导致 Node 事件循环停滞,因此,Bull 无法更新作业锁(有关我们如何更好地检测此问题,请参阅 #488)。您可以通过将作业处理器分解为更小的部分来解决此问题,这样任何一个部分都不会阻塞 Node 事件循环。或者,您可以为 lockDuration 设置传递一个更大的值(权衡是识别真正停滞的作业需要更长的时间)。

因此,您应该始终监听stalled事件并将其记录到您的错误监控系统中,因为这意味着您的作业可能会被双重处理。

作为一种保护措施,有问题的作业不会无限期地重新启动(例如,如果作业处理器离开它的 Node 进程崩溃了),作业将从停滞状态恢复最多maxStalledCount次数(默认值:)1


sli*_*fty 5

由于面临处理器线程过多的问题,我花了很多时间深入研究它。

简而言之,bull 的并发是在队列对象级别,而不是队列级别。

如果您深入研究代码,就会在调用.process队列对象时调用并发设置。这意味着即使在同一个 Node 应用程序中,如果您创建多个队列并.process多次调用,它们也会增加可以处理的并发作业的数量。

一位贡献者发布了以下内容:

是的,当我第一次使用 Bull 时,我也有点惊讶。队列选项永远不会保留在 Redis 中。每个应用程序可以根据需要拥有任意多个队列实例,每个实例可以有不同的设置。并发设置是在注册处理器时设置的,它实际上特定于每个 process() 函数调用,而不是队列。如果您使用命名处理器,则可以多次调用 process()。每次调用都会按并发量(默认为 1)注册 N 个事件循环处理程序(使用 Node 的 process.nextTick())。

所以你的问题的答案是:是的,如果你在多个节点实例中注册进程处理程序,你的进程将由多个节点实例处理。


Man*_*llo 5

Bull 被设计用于以“至少一次”语义同时处理作业,尽管如果处理器工作正常,即没有停止或崩溃,它实际上是在交付“恰好一次”。但是,您可以将最大停止重试次数设置为 0 (maxStalledCount https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queue ),然后语义将是“最多一次”。

话虽如此,我将尝试回答发帖者提出的两个问题:

如果一个 Node 实例指定不同的并发值,会发生什么情况?

我假设你的意思是“队列实例”。如果是这样,则在处理器中指定并发性。如果并发度为 X,则该给定处理器最多会同时处理 X 个作业。

我能否确定作业不会由多个 Node 实例处理?

是的,只要您的作业不崩溃或者您的最大停滞作业设置为 0。


dm0*_*514 2

啊欢迎!这是一个元答案,可能不是您所希望的,而是解决此问题的一般过程:

您可以指定并发参数。然后 Bull 将根据这个最大值并行调用您的处理程序。

我个人不太理解这一点或 bull 提供的保证。因为还不是很清楚:

IMO最重要的是:

我能否确定作业不会由多个 Node 实例处理?

如果独占消息处理是一个不变量,并且会导致您的应用程序不正确,即使有很好的文档,我也强烈建议对库进行尽职调查:p