从 Asyncio 事件循环部署 Celery 任务

Goo*_*ies 5 python asynchronous celery celery-task python-asyncio

我有一个当前使用 asyncio 编写的后端应用程序:用于 Web 服务器的 fastapi,用于异步数据库驱动程序的 sqlalchemy 1.4 + asyncpg。我需要将任务部署给将运行和更新主机应用程序的工作人员。目前我正在使用aio_pika,但想要更强大的东西,例如celerywith flower

我知道 celery 没有与 asyncio 集成。我也读过这样的答案,我担心的是任务不是异步的,这是微不足道的。我担心从主事件循环内启动任务。

我的主要问题是my_task.delay()/是否完全my_task.apply_async()阻塞正在运行的线程?如果是这样,更好的方法是使用来自中央或 a的多处理工作人员,然后仅从该工作人员进程部署 celery 任务吗?getmp.QueueProcessPoolExecutor

我想要部署任务,并且最好在任务完成时收到通知。不过,这可以通过界面在任务本身内完成fastapi。我只是想确保部署任务不会阻止异步事件循环。

小智 0

我尝试用您链接的帖子(这个)中的答案做一些事情。基本上我拿了他的代码并做了一些修改。对于大多数使用简单任务的情况来说,它似乎可以正常工作,但我想它并不完全安全,这只是我所做的一种解决方法。这是代码:

import asyncio

from celery import Celery


class AsyncCelery(Celery):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.patch_task()

    def patch_task(self):
        TaskBase = self.Task

        class ContextTask(TaskBase):
            abstract = True

            async def _run(self, *args, **kwargs):
                asyncio.set_event_loop(asyncio.get_event_loop())
                result = TaskBase.__call__(self, *args, **kwargs)
                return await result

            def __call__(self, *args, **kwargs):
                loop = asyncio.get_event_loop()
                try:
                    return loop.run_until_complete(self._run(*args, **kwargs))
                except:
                    return asyncio.run(self._run(*args, **kwargs))

        self.Task = ContextTask
Run Code Online (Sandbox Code Playgroud)