Google PubSub 和自动缩放计算引擎实例 (Python)

Mat*_*ias 5 python google-compute-engine docker google-cloud-platform google-cloud-pubsub

在我的场景中,我正在使用 PubSub 安排任务。这是多达 2.000 条 PubSub 消息,这些消息由在 Google Compute Engine 内的 Docker 容器内运行的 Python 脚本使用。该脚本使用 PubSub 消息。

每条消息的处理时间约为 30 秒到 5 分钟。因此,确认截止时间为 600 秒(10 分钟)。

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message

def handle_message(message: Message):
    # do your stuff here (max. 600sec)
    message.ack()
    return

def receive_messages(project, subscription_name):

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_name)

    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    subscription = subscriber.subscribe(subscription_path, flow_control=flow_control)

    future = subscription.open(handle_message)

    # Blocks the thread while messages are coming in through the stream. Any
    # exceptions that crop up on the thread will be set on the future.
    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    try:
        future.result()
    except Exception as e:
        # do some logging
        raise
Run Code Online (Sandbox Code Playgroud)

因为我正在处理如此多的 PubSub 消息,所以我正在为计算引擎创建一个模板,该模板以以下两种方式之一使用自动缩放

gcloud compute instance-groups managed create my-worker-group \
  --zone=europe-west3-a \
  --template=my-worker-template \
  --size=0

gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
  --zone=europe-west3-a \
  --max-num-replicas=50 \
  --min-num-replicas=0 \
  --target-cpu-utilization=0.4

gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
  --zone=europe-west3-a \
  --max-num-replicas=50 \
  --min-num-replicas=0 \
  --update-stackdriver-metric=pubsub.googleapis.com/subscription/num_undelivered_messages \
  --stackdriver-metric-filter="resource.type = pubsub_subscription AND resource.label.subscription_id = my-pubsub-subscription" \
  --stackdriver-metric-single-instance-assignment=10
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好。选项一可扩展到大约 8 个实例,而第二个选项将启动最大数量的实例。现在我发现发生了一些奇怪的事情,这就是我在这里发帖的原因。也许你能帮我一把?!

消息重复:似乎每个实例中的 PubSub 服务(计算引擎中 docker 容器内的 Python 脚本)读取一批消息(~10),有点像缓冲区,并将它们提供给我的代码。看起来所有同时启动的实例都将读取所有相同的消息(2.000 的前 10 个)并且将开始处理相同的内容。在我的日志中,我看到大多数消息被不同的机器处理了 3 次。我期望 PubSub 知道某个订阅者是否缓冲了 10 条消息,以便另一个订阅者缓冲 10 条不同的消息而不是相同的消息。

确认截止日期:由于缓冲,到达缓冲区末尾的消息(假设消息 8 或 9)必须在缓冲区中等待,直到前面的消息(消息 1 到 7)被处理。该等待时间加上它自己的处理时间的总和可能会达到 600 秒的超时。

负载均衡:因为每台机器缓冲这么多消息,负载只被少数实例消耗,而其他实例完全空闲。这发生在使用 PubSub stackdriver 指标的缩放选项二中。

人们告诉我,我需要使用 Cloud SQL 或其他东西来实现手动同步服务,其中每个实例都指示它正在处理的消息,以便其他实例不会启动。但我觉得这不可能是真的——因为那样我就不知道 PubSub 是什么了。

发布订阅行为

更新:我找到了Gregor Hohpe 的一个很好的解释,他是 2015 年企业集成模式一书的合著者。实际上我的观察是错误的,但观察到的副作用是真实的。

Google Cloud Pub/Sub API 实际上实现了发布-订阅频道和竞争消费者模式。Cloud Pub/Sub 的核心是一个经典的发布订阅频道,它将发布到它的单个消息传递给多个订阅者。这种模式的一个优点是添加订阅者没有副作用,这也是发布订阅频道有时被认为比点对点频道更松散耦合的原因之一,点对点频道只将消息传递给一个订阅者。将消费者添加到点对点渠道会导致竞争消费者,因此会产生强烈的副作用。

版权所有:Gregor Hohpe,企业集成模式一书的合著者。 2015 年。

我观察到的副作用是关于每个订阅者(订阅相同订阅,点对点 == 竞争消费者)中的消息缓冲和消息流控制。当前版本的 Python Client Lib 包装了 PubSub REST API(和 RPC)。如果使用该包装器,则无法控制:

  • 一台虚拟机上启动了多少容器;如果 CPU 尚未完全使用,可能会启动多个容器
  • 一次从订阅中提取多少条消息(缓冲);完全没有控制
  • 在容器内部启动了多少线程,用于处理拉取的消息;如果值低于固定值,则 flow_control(max_messages) 无效。

我们观察到的副作用是:

  1. 一个消费者一次拉取大量消息(大约 100 到 1.000)并将它们排在其客户端缓冲区中。因此,根据自动缩放规则启动的所有其他 VM 不会收到任何消息,因为所有消息都在前几个 VM 的队列中
  2. 如果消息在确认截止日期前到达,消息将重新传送到同一个 VM 或任何其他 VM(或 docker 容器)。因此,您需要在处理消息时修改确认截止时间。截止时间计数器在处理开始时开始。
  3. 假设消息的处理是一个长时间运行的任务(例如机器学习),你可能
    • 预先确认消息,但如果没有进一步的消息等待,这将导致 VM 被自动缩放规则关闭。该规则不关心 CPU 利用率是否仍然很强并且处理尚未完成。
    • 处理后确认消息。在这种情况下,您需要在处理该消息时修改该特定消息的确认期限。自上次修改以来,不得有一个代码块打破了截止日期。

尚未研究的可能解决方案:

  • 使用 Java 客户端库,因为它对拉取和使用消息具有更好的控制
  • 使用 Python 客户端库的底层 API 调用和类
  • 构建协调竞争消费者的同步存储

小智 1

我认为有两种主要方法可以解决这个问题。

1) 不要直接推送到工作进程,而是推送到负载均衡器。

或者

2) 让您的工作进程拉取请求,而不是将请求推送给工作进程。

请参阅“推拉交付”下的“负载平衡”部分

https://cloud.google.com/pubsub/docs/subscriber