Pri*_*shj 5 python testing celery
由于复杂的回调/链接设置,我们基础设施中的某些任务似乎将进入未分配到的队列。所以我想编写自动化测试来确保 celery 任务被发送到它被指定处理的队列。
设置示例:
from celery import Celery
celery = Celery()
@celery.task(base=MyTask, queue='mytasks.add')
def add(x, y):
a = x + y
return a
@celery.task(base=MyTask, queue='mytasks.dadd')
def double_add(a, y):
b = a + y
def caller(x, y):
add.apply_async(args=(2, 1), kwargs={''callback': double_add.subtask(args=(3)) })
Run Code Online (Sandbox Code Playgroud)
所以这里的“add”应该由queue='mytasks.add'处理,而“double_add”应该由queue='mytasks.dadd'处理
我了解 celery 基于结果的基本测试,如下所示:如何对 Celery 任务进行单元测试?
但我希望对上述场景的测试过程有任何了解。
| 归档时间: |
|
| 查看次数: |
244 次 |
| 最近记录: |