如何在异步RestAPI中等待Celery任务结果?

MrF*_*fer 8 python redis celery python-asyncio fastapi

我有一个使用 FastAPI 构建的 API,该端点将任务提交给 celery 工作人员,等待工作人员完成其工作并将结果返回给用户。

问题是等待结果的正确方法是什么?

端点代码

from tasks import celery_application, some_task
from celery.result import AsyncResult

@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result = AsyncResult(id=task.task_id, app=celery_application).get()
    return {'task_result': result}
Run Code Online (Sandbox Code Playgroud)

问题在于AsyncResultget方法会阻塞应用程序,它会同步等待结果,同时 api 会冻结。

我想出的解决方案之一是在循环中检查 n 秒的结果

from tasks import celery_application, some_task
import asyncio
import redis


r = redis.Redis.from_url(REDIS_CONN_URI)


@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result = None
    for _ in range(100):
        if r.exists(task.task_id):
            result = r.get(task.task_id)
            break
        await asyncio.sleep(0.3)

    return {'task_result': result}
Run Code Online (Sandbox Code Playgroud)

但它只能部分起作用。虽然端点没有被阻止并且可以访问。当端点尝试再次到达发送任务时,它会被阻止。