将异步协程作为 celery 任务运行

Ale*_*ets 6 python tornado celery async-await python-asyncio

我希望能够在我的 celery 任务中使用异步代码。它适用于asyncio 或tornado。我发现我可以做这样的事情


from tornado.ioloop import IOLoop

from celery._state import _task_stack

from . import celery


class AsyncTask(celery.Task):

    def __call__(self, *args, **kwargs):
        _task_stack.push(self)
        self.push_request(args=args, kwargs=kwargs)
        try:
            IOLoop.current().run_sync(lambda: self.run(*args, **kwargs))
        finally:
            self.pop_request()
            _task_stack.pop()
Run Code Online (Sandbox Code Playgroud)

然后像这样使用它

from .celery import celery
from tornado.httpclient import AsyncHTTPClient

@celery.task(base=AsyncTask)
async def test_async_celery_task(x, y):
    result = await AsyncHTTPClient().fetch(request='https://google.com.ua')
    print('Async IS OKAY: {}'.format(result))
Run Code Online (Sandbox Code Playgroud)

或者我可以run_sync直接在任务中使用,这不是更好的选择

我想知道执行上述操作是否可以,或者我应该在工作人员中启动事件循环并通过add_future. 还有其他人做过类似的事情吗?我可以希望性能有所提高吗?

我需要这个,因为我需要能够使用项目其他部分的异步代码,例如数据库调用等

小智 0

并不是 ioloop 的正确使用让事情变得复杂,而是让 Celery 知道它可以而且应该使用异步任务。Celery 与 Pools 一起运行,它可以在其中安排您的工作。池可以是线程的,可以是多进程的,Celery 知道(通过配置)它有 X 个进程、Y 个线程、Z 个工作线程,其中有多少个有工作或空闲。但是 ATM,Celery 从技术上来说能够接收和运行协程,但是开箱即用无法对它们进行计数以跟踪其中有多少有工作或空闲。如果你想看看它是什么样子的 - 这里有未来 Celery 的 Asyncpool 的开发代码