tim*_*irg 5 python celery pytest flask
测试 Flask 应用程序时,该celery_worker装置不起作用,因为 celery 附带的 pytest 装置不在 Flask 应用程序上下文中运行。
# tasks.py
@current_app.task(bind=True)
def some_task(name, sha):
return Release.query.filter_by(name=name, sha=sha).all()
# test_celery.py
def test_some_celery_task(celery_worker):
async_result = some_task.delay(default_appname, default_sha)
assert len(async_result.get()) == 0
Run Code Online (Sandbox Code Playgroud)
上面的测试将简单地抛出RuntimeError: No application found.并拒绝运行。
通常,当在 Flask 项目中使用 celery 时,我们必须继承celery.Celery并修补该__call__方法,以便实际的 celery 任务将在 Flask 应用程序上下文中运行,如下所示:
def make_celery(app):
celery = Celery(app.import_name)
celery.config_from_object('citadel.config')
class EruGRPCTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return super(EruGRPCTask, self).__call__(*args, **kwargs)
celery.Task = EruGRPCTask
celery.autodiscover_tasks(['citadel'])
return celery
Run Code Online (Sandbox Code Playgroud)
但是看看celery.contrib.pytest,我发现没有简单的方法可以对这些固定装置执行相同的操作,即修改基本 celery 应用程序,以便任务可以在 Flask 应用程序上下文中运行。
我没有使用celery.contrib.pytest,但我想提出不错的解决方案。
首先,您需要将 celery 任务分为sync和async部分。这里有一个例子sync_tasks.py:
def filtering_something(my_arg1):
# do something here
def processing_something(my_arg2):
# do something here
Run Code Online (Sandbox Code Playgroud)
示例async_tasks.py(或您的芹菜任务):
@current_app.task(bind=True)
def async_filtering_something(my_arg1):
# just call sync code from celery task...
return filtering_something(my_arg1)
@current_app.task(bind=True)
def async_processing_something(my_arg2):
processing_something(my_arg2)
# or one more call...
# or one more call...
Run Code Online (Sandbox Code Playgroud)
在这种情况下,您可以为所有功能编写测试,而不依赖于 Celery application:
from unittest import TestCase
class SyncTasks(TestCase):
def test_filtering_something(self):
# ....
def test_processing_something(self):
# ....
Run Code Online (Sandbox Code Playgroud)
有什么好处?
celery app您的测试与和分开flask app。worker_pool连接池或其他brokers问题。celery.contrib.pytest,但您可以通过 100% 的测试覆盖您的代码。mocks。希望这可以帮助。
| 归档时间: |
|
| 查看次数: |
3569 次 |
| 最近记录: |