如何使用Kubernetes进行扩展工作队列

Tom*_*del 6 python azure docker kubernetes azure-kubernetes

我需要基于docker / python worker的可伸缩队列处理。我的想法转向了kubernetes。但是,我不确定最好的控制器/服务。

基于azure函数,我收到了传入的http流量,将简单消息添加到了存储队列。这些消息需要处理,并将结果反馈到结果队列中。

为了处理那些队列消息,我开发了python代码来循环队列并处理这些作业。每次成功循环之后,该消息将从源队列中删除,并将结果写入结果队列。一旦队列为空,代码就存在。

所以我创建了一个运行python代码的docker镜像。如果启动了多个容器,显然队列会更快地工作。我还实现了新的Azure Kubernetes服务以进行扩展。当我是kubernetes的新手时,我读到了在队列准备工作之前工作队列的工作范式。我简单的yaml模板如下所示:

apiVersion: batch/v1
kind: Job
metadata:
  name: myjob
spec:
  parallelism: 4
  template:
    metadata:
      name: myjob
    spec:
      containers:
      - name: c
        image: repo/image:tag
Run Code Online (Sandbox Code Playgroud)

我现在的问题是,该作业无法重新启动。

通常,队列中充满了一些条目,然后一段时间没有任何反应。然后,更大的队列又会到达,需要尽快处理。当然,我想再次运行该作业,但这似乎是不可能的。另外,如果队列中什么也没有,我想将占用空间减小到最小。

所以我的问题是,在这种情况下我应该使用哪种架构/构造,是否有简单的Yaml示例?

Mic*_*att 6

这可能是一个“愚蠢/ hacky”的答案,但是它简单,健壮,而且我已经在生产系统中使用了几个月了。

我有一个类似的系统,其中的队列有时会被清空,有时会被猛烈撞击。我以类似的方式编写队列处理器,它一次处理队列中的一条消息,如果队列为空,则终止。它设置为在Kubernetes作业中运行。

诀窍是这样的:我创建了一个CronJob来定期启动该作业的一个新实例,并且该作业允许无限的并行性。如果队列为空,它将立即终止(“缩小”)。如果队列被猛击,而最后一个作业尚未完成,则将启动另一个实例(“向上扩展”)。

无需费心查询队列和扩展状态集或其他任何内容,并且如果队列为空,则不会消耗任何资源。您可能需要调整CronJob间隔以微调它对填充队列的反应速度,但是它应该反应良好。


Ric*_*ico 5

这是一种常见模式,有多种方法可以构建解决方案。

一个常见的解决方案是让一个应用程序包含一组工作人员,始终轮询您的队列(这可能是您的 python 脚本,但您需要将其设为服务),并且通常您会希望使用 Kubernetes 部署(可能带有Horizo ​​ntal Pod Autoscaler)基于队列或 CPU 的一些指标。

在您的情况下,您需要使脚本成为守护进程,并轮询队列是否有任何项目(我假设您已经在使用并行性处理竞争条件)。然后使用 Kubernetes 部署来部署此守护进程,然后您可以根据指标或计划进行扩展和缩减。

也已经有针对多种不同语言的作业调度程序。非常受欢迎的一个是Airflow,它已经具备了“workers”的能力,但这对于单个 python 脚本来说可能有点过大了。