rzl*_*vmp 12 python concurrency celery
我的示例代码是:
\nfrom celery import Celery, Task\n\napp = Celery(\n "app",\n task_serializer=\'json\',\n accept_content=[\'json\'],\n result_serializer=\'json\',\n timezone=\'Asia/Tokyo\',\n enable_utc=True,\n backend=\'redis://127.0.0.1:6369/2\',\n broker=\'redis://127.0.0.1:6369/3\',\n include=[\'app.tasks\']\n)\n\napp.conf.task_routes = {\'app.tasks.long_run\': {\'queue\': \'long_running_task\'}}\n\n\nclass VerboseTask(Task):\n\n def on_failure(self, exc, task_id, args, kwargs, einfo):\n print("-------------------------------------------")\n print(\n f"FAILURE({task_id})\xef\xbc\x9a"\n f"{self.request.task}{kwargs}, Exception={exc}, ErrorInfo={einfo}"\n )\n print("-------------------------------------------")\n\n def on_success(self, retval, task_id, args, kwargs):\n print("-------------------------------------------")\n print(f"SUCCESS({task_id})\xef\xbc\x9a{self.request.task}{kwargs}")\n print("-------------------------------------------")\n\n def on_retry(self, exc, task_id, args, kwargs, einfo):\n print("-------------------------------------------")\n print(f"RETRY({task_id})\xef\xbc\x9a{self.request.task}{kwargs}")\n print("-------------------------------------------")\n\nRun Code Online (Sandbox Code Playgroud)\nfrom run import app, VerboseTask\nfrom time import sleep\n\n\nclass TSK:\n\n name = \'tsk\'\n\n @app.task(bind=True, base=VerboseTask)\n def short_run(self, **kwargs):\n for i in range(0, 3):\n print(f\'{self.request.task} [{self.request.id[0:5]}] \xe2\x86\x92 {i*5} ~ {(i+1)*5}\')\n sleep(5)\n print(\'short_run finished\')\n\n @app.task(bind=True, base=VerboseTask)\n def long_run(self, **kwargs):\n for i in range(0, 10):\n print(f\'{self.request.task} [{self.request.id[0:5]}] \xe2\x86\x92 {i*5} ~ {(i+1)*5}\')\n sleep(5)\n print(\'long_run finished\')\n\nRun Code Online (Sandbox Code Playgroud)\n我用芹菜
\ncelery -A run worker -l info --concurrency=4 -Q long_running_task,celery\nRun Code Online (Sandbox Code Playgroud)\n问题是当我调用long_running_task四次然后执行常规celery任务时,会long_running_task消耗所有工作人员并且celery任务需要等待四个长时间运行的任务。
我想保留至少 2 个工作人员来执行短期任务。例如,如果当前任务是两个短期运行任务和一个长期运行任务,则只有一个工作线程可用,并且只有短期运行任务可以运行。长时间运行的任务必须等待,而三个工作人员将同时空闲以消耗其中之一。
\n换句话说,限制长时间运行任务的最大并发度也是2一种可能的解决方案。
我知道我可能会运行两个具有独立并发性的 Celery 实例:
\ncelery -A run worker -l info --concurrency=2 -n worker1@%h -Q long_running_task\ncelery -A run worker -l info --concurrency=2 -n worker2@%h -Q celery\nRun Code Online (Sandbox Code Playgroud)\n但
\nlong_running_task工作人员来完成其他任务(这不是那么重要的条件,可能会被忽略)ENTRYPOINT(换句话说,我不想通过运行单独的命令来分隔工作人员。我只想执行celery -A run worker ...一次)有什么方法可以为满足我的条件的工作人员定制限制和并发数吗?
\n您可以让一个工作人员从多个队列中消费。如果您有一个 high_priority 队列和一个 long_running 队列,则可以为 high_priority 队列设置一个专用工作线程,以确保始终有一个工作线程准备好接收这些任务,然后让其他工作线程同时消耗 high_priority 和 long_running 任务。
就入口点而言,只需为入口点使用 shell 脚本,它接受一个参数(要从中使用的队列列表),因此您只有 1 个 docker 映像,但有 2 个部署:1 个用于 high_priority 队列,1 个用于两个队列,以及您可以根据负载单独缩放每个。
| 归档时间: |
|
| 查看次数: |
2501 次 |
| 最近记录: |