小编and*_*cho的帖子

在unittest中运行芹菜工人

我有以下设置:

  • Django-Celery项目A注册任务 foo
  • 项目B:使用Celery的send_task来调用 foo
  • 项目A和项目B具有相同的配置:SQS,用于序列化的msgpack,gzip等.
  • 每个项目都位于不同的github存储库中

我在项目A中对"foo"进行了单元测试,完全没有使用Celery,只是foo(1,2,3)断言结果.我知道它有效.

我已经过单元测试,项目B中的send_task发送了正确的参数.

我没有测试,需要你的建议是两个项目之间的集成.我想要一个单元测试:

  • 在项目A的上下文中启动一个worker
  • 使用项目B的代码发送任务
  • 断言,第一步中启动的工作人员使用我在第二步中发送的参数来获取任务,并且该foo函数返回了预期结果.

似乎可以通过使用python的子进程并解析worker的输出来解决这个问题,但这很难看.在这种情况下,推荐的单元测试方法是什么?您可以分享的任何代码段吗?谢谢!

python integration-testing unit-testing celery

6
推荐指数
1
解决办法
582
查看次数