max*_*max 25 python asynchronous celery python-3.x python-asyncio
如何创建一个使芹菜任务看起来像的包装器asyncio.Task?或者有更好的方法将Celery与asyncio?
Celery的创建者@asksol 说:
在异步I/O框架之上使用Celery作为分布式层是很常见的(最重要的提示:将CPU绑定任务路由到prefork worker意味着它们不会阻止您的事件循环).
但我找不到专门针对asyncio框架的代码示例.
Joh*_*fis 19
如官方网站所述,Celery 5.0版本可以实现:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface
- 如果我们计划利用新的asyncio库,Celery的下一个主要版本将仅支持Python 3.5.
- 删除对Python 2的支持将使我们能够删除大量的兼容性代码,并且使用Python 3.5可以让我们利用键入,async/await,asyncio和类似的概念,在旧版本中别无选择.
以上引用了上一个链接.
所以最好的办法是等待5.0版本发布!
同时,快乐编码:)
jua*_*nra 14
这种简单的方法对我来说很好用:
import asyncio
from celery import Celery
app = Celery('tasks')
async def async_function(param1, param2):
# more async stuff...
pass
@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
asyncio.run(async_function(param1, param2))
Run Code Online (Sandbox Code Playgroud)
Ben*_*hon 10
这是一个简单的帮助器,您可以使用它来使 Celery 任务等待:
import asyncio
from asgiref.sync import sync_to_async
# Converts a Celery tasks to an async function
def task_to_async(task):
async def wrapper(*args, **kwargs):
delay = 0.1
async_result = await sync_to_async(task.delay)(*args, **kwargs)
while not async_result.ready():
await asyncio.sleep(delay)
delay = min(delay * 1.5, 2) # exponential backoff, max 2 seconds
return async_result.get()
return wrapper
Run Code Online (Sandbox Code Playgroud)
与 一样sync_to_async,它可以用作直接包装器:
@shared_task
def get_answer():
sleep(10) # simulate long computation
return 42
result = await task_to_async(get_answer)()
Run Code Online (Sandbox Code Playgroud)
...作为装饰者:
@task_to_async
@shared_task
def get_answer():
sleep(10) # simulate long computation
return 42
result = await get_answer()
Run Code Online (Sandbox Code Playgroud)
当然,这不是一个完美的解决方案,因为它依赖于轮询。然而,在Celery 官方提供更好的解决方案之前,从 Django 异步视图调用 Celery 任务应该是一个很好的解决方法 。
编辑 2021/03/02:添加了调用sync_to_async以支持eager mode。
您可以使用文档中run_in_executor所述将任何阻塞调用包装到任务中,我还在示例中添加了自定义超时:
def run_async_task(
target,
*args,
timeout = 60,
**keywords
) -> Future:
loop = asyncio.get_event_loop()
return asyncio.wait_for(
loop.run_in_executor(
executor,
functools.partial(target, *args, **keywords)
),
timeout=timeout,
loop=loop
)
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
run_async_task, your_task.delay, some_arg, some_karg=""
)
result = loop.run_until_complete(
run_async_task, async_result.result
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10139 次 |
| 最近记录: |