多台机器上的 Celery 任务

ala*_*uri 2 python task rabbitmq celery celerybeat

我有一台安装了 RabbitMQ 代理的服务器和两个 Celery 消费者(main1.pymain2.py),它们都连接到同一个代理。

在第一个消费者 ( main1.py ) 中,我实现了一个 Celery Beat,它在特定队列上多次发送不同的任务:

app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
    [
        ('tasks.beat', {'queue': 'print-queue'}),
    ],
)
app.conf.beat_schedule = {
    'beat-every-10-seconds': {
        'task': 'tasks.beat',
        'schedule': 10.0
    },
}

@app.task(name='tasks.beat', bind=True)
def beat(self):
    for i in range(10):
        app.send_task("tasks.print", args=[i], queue="print-queue")

    return None
Run Code Online (Sandbox Code Playgroud)

在第二个消费者(main2.py)中,我实现了上面所说的任务:

app = Celery('tasks', broker=..., backend=...)
app.conf.task_routes = (
    [
        ('tasks.print', {'queue': 'print-queue'}),
    ],
)

@app.task(name='tasks.print', bind=True)
def print(self, name):
    return name
Run Code Online (Sandbox Code Playgroud)

当我启动两个 Celery 工作人员时:

consumer1: celery worker -A main1 -Q print-queue --beat
consumer2: celery worker -A main2 -Q print-queue
Run Code Online (Sandbox Code Playgroud)

我收到这些错误:

[ERROR/MainProcess] Received unregistered task of type 'tasks.print'
Run Code Online (Sandbox Code Playgroud)

在第一个消费者身上

[ERROR/MainProcess] Received unregistered task of type 'tasks.beat'
Run Code Online (Sandbox Code Playgroud)

关于第二个消费者

是否可以在连接到同一代理的不同 Celery 应用程序上拆分任务?

提前致谢!

mba*_*ano 5

这就是正在发生的事情。你有两名工人AB其中一名也恰好在运行 celerybeat(假设其中一名是B)。

  1. celerybeat 提交task.beat到队列。这一切所做的就是在rabbit中加入一些元数据(包括任务名称)的消息。
  2. 两名工人中的一名读到了这条消息。A 和 B 都在监听同一个队列,因此任何一方都可以读取它。

    A。如果 A 读取该消息,它将尝试查找名为此任务的任务,tasks.beat因为 A 没有定义该任务,所以会失败。

    b. 如果 B 读取该消息,它将成功尝试找到调用的任务tasks.beat(因为它确实有该任务)并将运行代码。tasks.beat将在rabbit中排队一条包含元数据的新消息tasks.print

  3. 同样的问题会再次出现,因为 A 和 B 中只有一个定义了tasks.print,但其中任何一个都可能收到消息。

在实践中,芹菜可能会做一些检查以提前抛出错误消息,但我相当确定这是根本问题。

简而言之,队列中的所有工作人员(包括beat)都应该运行相同的代码。