use*_*661 7 python django integration celery
我和具有多个芹菜工人软件导向架构的工作(我们姑且称之为worker1,worker2和worker3).所有三个工作者都是独立的实体(即,单独的代码库,单独的存储库,单独的芹菜实例,单独的机器),并且它们都没有连接到Django应用程序.
与这三个工作者中的每一个进行通信的是基于Django的MySQL支持的RESTful API.
在开发过程中,这些服务都在一个流浪盒上,每个服务器都作为一个单独的端口运行的独立机器.我们为所有Celery任务提供了一个RabbitMQ代理.
通过这些服务的典型路径可能如下所示:worker1从设备获取消息,执行某些处理,对任务进行排队worker2,进行进一步处理并对其进行POST API,从而写入MySQL数据库并触发任务on worker3,它执行一些其他处理并对其进行另一个POST,API从而导致MySQL写入.
服务沟通很好,但每次我们对任何服务进行更改时测试此流程都非常烦人.我真的希望得到一些完整的集成测试(即,从发送到worker1整个链的消息开始),但我不知道从哪里开始.我面临的主要问题是:
如果我排队worker1,我怎么能分辨出整个流程何时结束?当我不知道结果是否已经到达时,如何对结果做出合理的断言呢?
如何处理数据库设置/拆除?我希望在每次测试结束时删除测试期间所有条目,但如果我从Django应用程序外部开始测试,我不确定如何有效地清除它.手动删除它并在每次测试后重新创建它似乎可能是太多的开销.
Celery 允许同步运行任务,因此第一步是:将整个流程划分为单独的任务、伪造请求并断言结果:
原始流程:
device --- worker1 --- worker2 --- django --- worker3 --- django
Run Code Online (Sandbox Code Playgroud)
一级集成测试:
1. |- worker1 -|
2. |- worker2 -|
3. |- django -|
4. |- worker3 -|
5. |- django -|
Run Code Online (Sandbox Code Playgroud)
对于每个测试,创建假请求或同步调用并断言结果。将这些测试放入相应的存储库中。例如,在对worker1的测试中,您可以模拟worker2并测试它是否已使用正确的参数进行调用。然后,在另一个测试中,您将调用worker2和模拟请求来检查它是否调用了正确的API。等等。
测试整个流程将会很困难,因为所有任务都是单独的实体。我现在想到的唯一方法是对worker1进行一次假调用,设置合理的超时并等待数据库中的最终结果。这种测试只能告诉你它是否有效。它不会告诉你问题出在哪里。