我有一个使用 FastAPI 构建的 API,该端点将任务提交给 celery 工作人员,等待工作人员完成其工作并将结果返回给用户。
问题是等待结果的正确方法是什么?
端点代码
from tasks import celery_application, some_task
from celery.result import AsyncResult
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = AsyncResult(id=task.task_id, app=celery_application).get()
return {'task_result': result}
Run Code Online (Sandbox Code Playgroud)
问题在于AsyncResult该get方法会阻塞应用程序,它会同步等待结果,同时 api 会冻结。
我想出的解决方案之一是在循环中检查 n 秒的结果
from tasks import celery_application, some_task
import asyncio
import redis
r = redis.Redis.from_url(REDIS_CONN_URI)
@api.post('/submit')
async def submit(data: str):
task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
result = None
for _ in range(100):
if r.exists(task.task_id):
result = r.get(task.task_id)
break
await …Run Code Online (Sandbox Code Playgroud) 有没有办法在文件中创建变量列表docker-compose.yml并基于模板为每个变量创建服务?
例如PARAMS=[name_a, name_b, name_c]
创建3个服务:
FOR param IN PARAMS:
my-service-{param}:
build: my-image
container_name: my-container-{param}
environment:
NAME: {param}
SOME_OTHER: ...
Run Code Online (Sandbox Code Playgroud)
这段代码当然不起作用,但我相信功能很清楚。
我想知道是否可以为队列设置不同的预取乘数。
我有 2 个队列,一个队列运行时间很短,另一个队列稍长。较短任务的队列需要优先于其他任务。
为了确保优先级工作可靠,必须在 celery 配置中设置:
task_acks_late = True
worker_prefetch_multiplier = 1
Run Code Online (Sandbox Code Playgroud)
然而,这确实损害了快速任务队列的性能。是否可以配置为,如果工作人员从快速任务队列中获取,worker_prefetch_multiplier则为 4 ,如果工作人员从慢速任务队列中获取,worker_prefetch_multiplier则为 1 ?