使用主题交换运行多个 Celery 任务

joe*_*cox 5 python messaging message-queue rabbitmq celery

我正在用 Celery 替换一些自制代码,但很难复制当前的行为。我期望的行为如下:

  • tasks创建新用户时,应使用路由密钥将消息发布到交换器user.created
  • 此消息应触发两个 Celery 任务,即send_user_activate_emailcheck_spam

我尝试通过定义user_created带有参数的任务以及和 的ignore_result=True任务来实现此目的。send_user_activate_emailcheck_spam

在我的配置中,我添加了以下路由和队列定义。当消息传递到user_created队列时,它不会传递到其他两个队列。

理想情况下,消息仅传递到send_user_activate_emailcheck_spam队列。当使用 vanilla RabbitMQ 时,消息被发布到交换器,队列可以绑定到交换器,但 Celery 似乎直接将消息传递到队列。

我将如何在 Celery 中实现上述行为?

CELERY_QUEUES = {
    'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
    'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}

CELERY_ROUTES = {
    'user_created': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'send_user_activate_email': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
    'check_spam': {
        'queue': 'user_created',
        'routing_key': 'user.created',
        'exchange': 'tasks',
        'exchange_type': 'topic',
    },
}
Run Code Online (Sandbox Code Playgroud)

A S*_*ipt 4

听起来您期望一条消息被两个队列触发/消耗,但这不是 Celery 的工作方式。Exchange 会将任务发布到符合条件的队列,但一旦被消耗,其他队列就会忽略该消息。您想要触发的每个任务都需要一条消息。

Celery 新用户经常会感到困惑,因为该系统中“队列”有两种用途:Queue() 和文档引用的 Kombu 队列,以及直接保存消息并由工作人员使用的 AMQP 队列。当我们发布到队列时,我们会想到 AMQP 队列,这是不正确的。(感谢下面链接的回答)。

回到你的问题,如果我理解正确的话,当 user_created 被消耗时,你希望它再产生两个任务;send_user_activate_email 和 check_spam。此外,这些不应相互依赖;它们可以在不同的机器上并行运行,并且不需要知道彼此的状态。

在这种情况下,您希望 user_created “apply_async”这两个新任务并返回。这可以直接完成,或者您可以使用包含 check_spam 和 send_user_activate_email 的 Celery“组”来实现此目的。该小组提供了一些很好的速记方法,并为您的任务提供了一些结构,因此我个人会推动您朝这个方向发展。

#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
Run Code Online (Sandbox Code Playgroud)

此设置将创建四个消息;一个代表您想要执行的每个任务,再加上一个代表 Group(),它本身就会有一个结果。

就您而言,我不确定 Exchange 或ignore_result 是否必要,但我需要查看任务代码并更多地了解系统才能做出判断。

http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups http://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing -keys 为什么 CELERY_ROUTES 同时具有“队列”和“routing_key”?

(如果我离题太远,我会删除/删除答案......)