Goo*_*ies 5 python asynchronous celery celery-task python-asyncio
我有一个当前使用 asyncio 编写的后端应用程序:用于 Web 服务器的 fastapi,用于异步数据库驱动程序的 sqlalchemy 1.4 + asyncpg。我需要将任务部署给将运行和更新主机应用程序的工作人员。目前我正在使用aio_pika,但想要更强大的东西,例如celerywith flower。
我知道 celery 没有与 asyncio 集成。我也读过这样的答案,我担心的是任务不是异步的,这是微不足道的。我担心从主事件循环内启动任务。
我的主要问题是my_task.delay()/是否完全my_task.apply_async()阻塞正在运行的线程?如果是这样,更好的方法是使用来自中央或 a的多处理工作人员,然后仅从该工作人员进程部署 celery 任务吗?getmp.QueueProcessPoolExecutor
我想要部署任务,并且最好在任务完成时收到通知。不过,这可以通过界面在任务本身内完成fastapi。我只是想确保部署任务不会阻止异步事件循环。
小智 0
我尝试用您链接的帖子(这个)中的答案做一些事情。基本上我拿了他的代码并做了一些修改。对于大多数使用简单任务的情况来说,它似乎可以正常工作,但我想它并不完全安全,这只是我所做的一种解决方法。这是代码:
import asyncio
from celery import Celery
class AsyncCelery(Celery):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.patch_task()
def patch_task(self):
TaskBase = self.Task
class ContextTask(TaskBase):
abstract = True
async def _run(self, *args, **kwargs):
asyncio.set_event_loop(asyncio.get_event_loop())
result = TaskBase.__call__(self, *args, **kwargs)
return await result
def __call__(self, *args, **kwargs):
loop = asyncio.get_event_loop()
try:
return loop.run_until_complete(self._run(*args, **kwargs))
except:
return asyncio.run(self._run(*args, **kwargs))
self.Task = ContextTask
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1774 次 |
| 最近记录: |