Par*_*xis 6 python unit-testing celery
我知道这将被视为重复,但是在问这个问题之前我已经环顾四周,但是所有问题似乎都已过时或对我的问题没有任何帮助。这是我写这个问题之前看过的地方:
我目前正在从事一个大量使用Celery处理异步任务的项目;为了使整个代码库稳定,我正在为整个项目编写单元测试,但是到目前为止,我还无法为Celery 编写单个工作测试。
我的大多数代码都需要跟踪运行的任务,以便确定是否已准备好查询所有结果。这在我的代码中实现如下:
@app.task(bind=True)
def some_task(self, record_id):
associate(self.request.id, record_id) # Not the actual DB code, but you get the idea
# Somewhere else in my code, eg: Flask endpoint
record = some_db_record()
some_task.apply_async(args=[record.id])
Run Code Online (Sandbox Code Playgroud)
由于我没有基于* nix的计算机来运行我的代码,因此我尝试通过将always eager选项设置为true来解决此问题,但是,当任何子任务尝试查询结果时,都会引起问题:
@app.task(bind=True)
def foo(self):
task = bar.apply_async()
foo_poll.apply_async(args=[task.id])
@app.task(bind=True, max_retries=None):
def foo_poll(self, celery_id)
task = AsyncResult(celery_id)
if not task.ready(): # RuntimeError: Cannot retrieve result with task_always_eager enabled
return self.retry(countdown=5)
else:
pass # Do something with the result
@app.task
def bar():
time.sleep(10)
Run Code Online (Sandbox Code Playgroud)
我试图通过修补AsyncResult方法来解决此问题,但这self.request.id会导致以下问题None:
with patch.object(AsyncResult, "_get_task_meta", side_effect=lambda: {"status": SUCCESS, "result": None}) as method:
foo()
@app.task(bind=True)
def foo(self):
pass # self.request.id is now None, which I need to track sub-tasks
Run Code Online (Sandbox Code Playgroud)
有人知道我该怎么做吗?还是Celery甚至值得再使用?我在这里找到了文档以及与测试相关的任何问题,这些问题极其复杂,我只是想将它们全部抛弃,然后回到多线程。
我遇到了同样的问题,并提出了两种可能的方法:
if self.request.called_directly如果 True 或 False,则直接运行任务apply_async。task.ready()和其他状态检查与我检查ALWAYS_EAGER任务准备情况的功能。最终我想出了两者的混合规则,以尽可能避免嵌套任务。并且还放入@app.task尽可能少的代码,以便能够尽可能隔离地测试任务功能。
它可能看起来非常令人沮丧和可怕,但事实上并非如此。
您还可以查看像Sentry这样的大佬是如何做到这一点的(剧透:模拟和一些漂亮的助手)。
所以这绝对是可能的,只是找到一些最佳实践并不是一个简单的方法。
| 归档时间: |
|
| 查看次数: |
1494 次 |
| 最近记录: |