我如何编写测试以确保 celery 任务进入正确的队列

Pri*_*shj 5 python testing celery

由于复杂的回调/链接设置,我们基础设施中的某些任务似乎将进入未分配到的队列。所以我想编写自动化测试来确保 celery 任务被发送到它被指定处理的队列。

设置示例:

from celery import Celery

celery = Celery()

@celery.task(base=MyTask, queue='mytasks.add')
def add(x, y):
    a = x + y
    return a

@celery.task(base=MyTask, queue='mytasks.dadd')
def double_add(a, y):
    b = a + y

def caller(x, y):
    add.apply_async(args=(2, 1), kwargs={''callback': double_add.subtask(args=(3)) })
Run Code Online (Sandbox Code Playgroud)

所以这里的“add”应该由queue='mytasks.add'处理,而“double_add”应该由queue='mytasks.dadd'处理

我了解 celery 基于结果的基本测试,如下所示:如何对 Celery 任务进行单元测试?

但我希望对上述场景的测试过程有任何了解。