如何将芹菜与asyncio结合起来?

max*_*max 25 python asynchronous celery python-3.x python-asyncio

如何创建一个使芹菜任务看起来像的包装器asyncio.Task?或者有更好的方法将Celery与asyncio

Celery的创建者@asksol :

在异步I/O框架之上使用Celery作为分布式层是很常见的(最重要的提示:将CPU绑定任务路由到prefork worker意味着它们不会阻止您的事件循环).

但我找不到专门针对asyncio框架的代码示例.

Joh*_*fis 19

如官方网站所述,Celery 5.0版本可以实现:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. 如果我们计划利用新的asyncio库,Celery的下一个主要版本将仅支持Python 3.5.
  2. 删除对Python 2的支持将使我们能够删除大量的兼容性代码,并且使用Python 3.5可以让我们利用键入,async/await,asyncio和类似的概念,在旧版本中别无选择.

以上引用了上一个链接.

所以最好的办法是等待5.0版本发布!

同时,快乐编码:)

  • 但这并没有发生,而且 celery 5 与 asyncio 不兼容。 (5认同)

jua*_*nra 14

这种简单的方法对我来说很好用:

import asyncio
from celery import Celery

app = Celery('tasks')

async def async_function(param1, param2):
    # more async stuff...
    pass

@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
    asyncio.run(async_function(param1, param2))
Run Code Online (Sandbox Code Playgroud)


Ben*_*hon 10

这是一个简单的帮助器,您可以使用它来使 Celery 任务等待:

import asyncio
from asgiref.sync import sync_to_async

# Converts a Celery tasks to an async function
def task_to_async(task):
    async def wrapper(*args, **kwargs):
        delay = 0.1
        async_result = await sync_to_async(task.delay)(*args, **kwargs)
        while not async_result.ready():
            await asyncio.sleep(delay)
            delay = min(delay * 1.5, 2)  # exponential backoff, max 2 seconds
        return async_result.get()
    return wrapper
Run Code Online (Sandbox Code Playgroud)

与 一样sync_to_async,它可以用作直接包装器:

@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await task_to_async(get_answer)()
Run Code Online (Sandbox Code Playgroud)

...作为装饰者:

@task_to_async
@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await get_answer()
Run Code Online (Sandbox Code Playgroud)

当然,这不是一个完美的解决方案,因为它依赖于轮询然而,在Celery 官方提供更好的解决方案之前,从 Django 异步视图调用 Celery 任务应该是一个很好的解决方法 。

编辑 2021/03/02:添加了调用sync_to_async以支持eager mode


dan*_*ius 6

您可以使用文档中run_in_executor所述将任何阻塞调用包装到任务中,我还在示例中添加了自定义超时

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)
Run Code Online (Sandbox Code Playgroud)