MrF*_*fer 8 python redis celery python-asyncio fastapi
我有一个使用 FastAPI 构建的 API,该端点将任务提交给 celery 工作人员,等待工作人员完成其工作并将结果返回给用户。
问题是等待结果的正确方法是什么?
端点代码
from tasks import celery_application, some_task
from celery.result import AsyncResult
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = AsyncResult(id=task.task_id, app=celery_application).get()
return {'task_result': result}
Run Code Online (Sandbox Code Playgroud)
问题在于AsyncResult该get方法会阻塞应用程序,它会同步等待结果,同时 api 会冻结。
我想出的解决方案之一是在循环中检查 n 秒的结果
from tasks import celery_application, some_task
import asyncio
import redis
r = redis.Redis.from_url(REDIS_CONN_URI)
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = None
for _ in range(100):
if r.exists(task.task_id):
result = r.get(task.task_id)
break
await asyncio.sleep(0.3)
return {'task_result': result}
Run Code Online (Sandbox Code Playgroud)
但它只能部分起作用。虽然端点没有被阻止并且可以访问。当端点尝试再次到达发送任务时,它会被阻止。
| 归档时间: |
|
| 查看次数: |
3139 次 |
| 最近记录: |