在 AWS 上监控和扩展基于 Docker 的 Celery 工作线程集群

gru*_*gru 4 amazon-web-services celery docker amazon-elastic-beanstalk

所以我有一个 docker 镜像,它通过主管运行一个 celery worker,并且在单 docker Elastic Beanstalk 上工作得很好(相当长的任务,所以acks late = trueconcurrency = 1prefetch multiplier = 1)。

问题是我想根据工作人员的有效任务负载来扩展实例,而 EB 只允许整体网络和 CPU 负载。

添加一个规则来扩大 CPU 负载工作正常,但我不能保证 EB 在任务中间不会决定缩小。这将触发docker stop并有效地杀死任何无法在短时间内完成的正在运行的芹菜(如果我没记错的话是 10 秒)。

理想情况下,我需要一个基于 CPU 活动和队列中任务的监视器,伪代码如下:

while check interval has passed if task queue is empty (or workers are not busy) if running instances is greater than 1 scale down 1 instance else if CPU load is higher than threshold scale up 1 instance

现在的问题是,这种逻辑级别在 EB 中似乎无法实现,更有可能在 ECS 上运行,但我不确定以下几点:

  • 我们应该实现什么样的 celery 监视器以及代码应该在哪里运行?例如,通常的celery worker 监视器命令似乎不是监控 worker 忙碌程度的好方法,我们需要处理在 docker 中运行 worker 的额外复杂性
  • 集群扩展功能应该在哪里运行?在与 AWS 工程师交谈后,似乎AWS Lambda可能是一个潜在的解决方案,但将集群实例负载报告挂钩到 lambda 片段似乎非常复杂且难以维护
  • 作为一个额外的问题,如果我们需要迁移到 ECS,我们还需要重写我们的部署脚本以手动触发版本交换,因为这目前由 EB 管理。这样做的最佳方法是什么?

任何帮助表示赞赏,谢谢!

Ani*_*tel 5

您可以利用 AWS elastic beanstalk 服务,您只需要提供 docker 镜像。它还带有仪表板,您可以在其中提供环境变量或根据 CPU/请求/内存等扩展您的应用程序/工作程序。

您已经为您的芹菜工人制作了 docker 图像。因此,而不是在 CPU 或内存上进行扩展。您可以根据队列中的多个任务扩展您的实例。您可以自己设置缩放约束。

以下是计算队列中 celery 任务的不同方法。选择您的选项以将监视放在任务队列中。

使用鼠兔:

import pika

pika_conn_params = pika.ConnectionParameters(
    host='localhost', port=5672,
    credentials=pika.credentials.PlainCredentials('guest', 'guest'),
)
connection = pika.BlockingConnection(pika_conn_params)
channel = connection.channel()
queue = channel.queue_declare(
    queue="your_queue", durable=True,
    exclusive=False, auto_delete=False
)

print(queue.method.message_count)
Run Code Online (Sandbox Code Playgroud)

使用 PyRabbit:

from pyrabbit.api import Client
cl = Client('localhost:55672', 'guest', 'guest')
cl.get_messages('example_vhost', 'example_queue')[0]['message_count']
Run Code Online (Sandbox Code Playgroud)

使用 HTTP

句法:

curl -i -u user:password http://localhost:15672/api/queues/vhost/queue
Run Code Online (Sandbox Code Playgroud)

例子:

curl -i -u guest:guest http://localhost:15672/api/queues/%2f/celery 
Run Code Online (Sandbox Code Playgroud)

注意:默认 vhost 是 / 需要转义为 %2f

使用命令行:

$ sudo rabbitmqctl list_queues | grep 'my_queue'
Run Code Online (Sandbox Code Playgroud)

现在,根据您选择的选项(CLI 或 Python),您可以应用以下解决方案来扩展您的 celery worker。这两种逻辑的共同点是都需要持续运行。

如果选择 CLI。您可以创建一个脚本,该脚本将持续监控任务数量并在超过提供的限制时应用扩展逻辑。如果您正在使用 kubernetes,那么扩展您的部署将非常容易。否则,您将需要遵循系统所需的方法。

使用python,您可以遵循相同的路径,但唯一的优势是如果将来逻辑变得过于复杂,它可以在将来作为服务工作。