Celery文档提到在Django中测试Celery,但如果你没有使用Django,则没有解释如何测试Celery任务.你怎么做到这一点?
我正在处理的应用程序非常异步.Web应用程序根据用户操作通过芹菜运行许多任务.芹菜任务本身能够启动进一步的任务.
下面显示的代码经常出现在我们的代码库中.
def do_sth():
logic();
if condition:
function1.apply_async(*args)
else:
function2.apply_asynch(*args)
Run Code Online (Sandbox Code Playgroud)
现在我们要开始对我们编写的任何新代码进行单元测试,但我们不确定如何执行此操作.我们想在我们的断言pytest单元测试是我们希望看到的,如果功能1实际上得到调用.我们不希望自己运行,function1因为我们将进行单元测试function1.
我不希望将celery作为进程运行,也不想在单元测试会话期间运行任何AMQP代理.
这可以实现吗?
编辑
有人指出,这是一个副本如何单元测试Celery任务?
它不是.想一想.我要问的是如何测试函数是否通过apply_async 调用了function1.那个问题是我如何测试function1本身.有一个主要的区别.在构建这个问题之前,我确实遇到了这个问题.