Celery-链接远程回调

Hus*_*ain 5 python distributed celery

我有三个Celery任务,分别在三个不同的服务器上运行。

  • task.send_push_notification
  • task.send_sms
  • task.send_email

我想设置一个工作流,以便如果发送推送通知失败,我应该尝试发送短信。如果发送短信失败,我应该发送电子邮件。

如果这3个任务及其代码库位于同一服务器上,那么我将按照链接任务的示例进行操作,例如

from celery import chain
from tasks import send_push_notification, send_sms, send_email
import json
# some paylaod
payload = json.dumps({})
res = chain(
    send_push_notification.subtask(payload),
    send_sms.subtask(payload),
    send_email.subtask(payload)
)()
Run Code Online (Sandbox Code Playgroud)

但是任务被保存在3个不同的服务器上!

我试过了

# 1
from celery import chain
from my_celery_app import app
res = chain(
    app.send_task('tasks.send_push_notification', payload),
    app.send_task('tasks.send_sms', payload),
    app.send_task('tasks.send_email', payload)
)()
# Which fails because I am chaining tasks not subtasks
Run Code Online (Sandbox Code Playgroud)

# 2
from celery import chain, subtask
res = chain(
    subtask('tasks.send_push_notification', payload), 
    subtask('tasks.send_sms', payload), 
    subtask('tasks.send_email', payload)
)()
# fails because I am not adding the tasks on the broker
Run Code Online (Sandbox Code Playgroud)

如何才能做到这一点?

更新: 我可以使用linkNOT 做到这一点chain

from celery import subtask
res = app.send_task(
    'tasks.send_push_notification', (payload, ),
    link=subtask(
        'tasks.send_sms', (payload, ),
        link=subtask(
            'tasks.send_email', (payload, ),
        )
    )
)
Run Code Online (Sandbox Code Playgroud)

有很多嵌套。而且由于我实际上需要创建一个数据库驱动的工作流,因此以这种方式创建它会很复杂。

Muh*_*hir 0

为什么不在你的任务中处理它,

def push_notification_task(payload):
    if not send_push_notification(payload):
        sms_notification_task.delay(payload)

def sms_notification_task(payload):
    if not send_sms_notification(payload):
        email_notification_task.delay(payload)

def email_notification_task(payload):
    send_email_notification(payload)
Run Code Online (Sandbox Code Playgroud)

此外,chain将按给定顺序执行所有任务,而您希望仅在第一个任务失败时才运行下一个任务。