我有一个主题和一个订阅多个订阅者。我的应用场景是我想处理不同订阅者上的消息,一次处理特定数量的消息。意味着首先假设 8 条消息正在处理,然后如果一条消息处理完成,那么在确认处理的消息后,下一条消息应该从主题中获取,同时注意在任何订阅者上都找不到重复的消息,并且每次 8 条消息都应该在后台处理。
为此,我使用了 max_messages = 8 的同步拉取方法,但在所有消息处理完成后才完成下一次拉取。因此,我们创建了自己的调度程序,同时 8 个进程应该在后台运行并一次拉取 1 条消息,但在所有 8 条消息处理完成后仍会传递下一条消息。
这是我的代码:
#!/usr/bin/env python3
import logging
import multiprocessing
import time
import sys
import random
from google.cloud import pubsub_v1
project_id = 'xyz'
subscription_name = 'abc'
NUM_MESSAGES = 4
ACK_DEADLINE = 50
SLEEP_TIME = 20
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
def worker(msg):
logger.info("Received message:{}".format(msg.message.data))
random_sleep = random.randint(200,800)
logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
time.sleep(random_sleep)
def message_puller():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
while(True):
try:
response = …Run Code Online (Sandbox Code Playgroud) publish-subscribe python-3.x google-cloud-platform google-cloud-pubsub