小编SH2*_*H21的帖子

如何在 celery 任务中使用 asyncio 和 aioredis 锁?

目标:

  1. 可以运行 asyncio 协程。
  2. 纠正异常和任务重试时的 celery 行为。
  3. 可以使用 aioredis 锁。

那么,如何正确运行异步任务来达到目标​​呢?

这是什么RuntimeError: await wasn't used with future(如下),我该如何修复它?


我已经尝试过:

1. 阿斯吉里夫

async_to_sync(来自 asgiref https://pypi.org/project/asgiref/)。

此选项可以运行 asyncio 协程,但重试功能不起作用。

2. 芹菜池异步

https://pypi.org/project/celery-pool-asyncio/

与 asgiref 中的问题相同。(此选项可以运行 asyncio 协程,但重试功能不起作用。)

3.编写自己的异步同步装饰器

我已经尝试创建自己的装饰器,例如运行协程线程安全(asyncio.run_coroutine_threadsafe)的 async_to_sync ,但我的行为如上所述。

4. 异步模块

我还尝试asyncio.run()asyncio.get_event_loop().run_until_complete()(和self.retry(...))内部芹菜任务。这运行良好,任务运行,重试有效,但是协程执行不正确 - 在async函数内部我无法使用 aioredis。

实施注意事项

  • 启动芹菜命令celery -A celery_test.celery_app worker -l info -n worker1 -P gevent …

celery celery-task python-3.x python-asyncio aioredis

4
推荐指数
1
解决办法
5219
查看次数