Celery + Gevent 池在执行 1000 多个任务后挂起

Vim*_*mal 4 python celery gevent celery-task

我们有8个核心,16 GB内存,运行celery的Linux服务器,它运行一个celery工作队列myQueue,并在gevent池下以1000并发运行。

执行任务大约 1 小时后,worker 突然冻结,它不接受新任务,这里celery beat 是我们对 celery 的配置

App =  Celery('tasks')
class Conf:
   BROKER_URL   = 'amqp://<user>:<pass>@<host>:<port>/<vhost>'
   CELERY_IGNORE_RESULT = True
   CELERY_IMPORTS = ("worker_class",)
   CELERYBEAT_SCHEDULE = {
       'RunTask':{
           'task': 'tasks.worker.MyWorker',
           'schedule' : timedelta(minutes=5)
       }
   }
Run Code Online (Sandbox Code Playgroud)

App.config_from_object(会议)

我们正在运行芹菜,如下所示

celery worker --workdir=tasks/ -A worker -P gevent -c 1000 -Q myQueue --loglevel=INFO
Run Code Online (Sandbox Code Playgroud)

还有人可以解释一下我如何使用 gevent 池吗celery multi

Ami*_*deh 5

要使用以下命令指定池类型celery multi

celery -A myApp multi start 4 -l INFO -P gevent -c 1000 -Q myQueue
Run Code Online (Sandbox Code Playgroud)

上面的命令启动 4 个 gevent 工作线程,每个工作线程的并发级别为 1000,并且全部从myQueue消费。

但这并不是乐趣的结束,因为您甚至可以指定每个工作线程的并发性,还可以指定每个工作线程消耗哪个队列。例如:

celery -A myApp multi start 4 -l INFO -P gevent -c:1-3 1000 -c:4 200 -Q:1-2 myQueue1 -Q:3 myQueue2 -Q:4 myQueue3
Run Code Online (Sandbox Code Playgroud)

就像之前我们启动 4 个 gevent Worker 一样,但是现在 Worker 1 到 3 的并发度为 1000,而最后一个 Worker 的并发度为 200。此外,Worker 1 和 2 从myQueue1消费,Worker 3 从myQueue2消费,Worker 4 消费来自myQueue4

注意:这些celery worker选项适用于celery multi.