谷歌云pubsub python同步拉取

Pra*_*nde 2 publish-subscribe python-3.x google-cloud-platform google-cloud-pubsub

我有一个主题和一个订阅多个订阅者。我的应用场景是我想处理不同订阅者上的消息,一次处理特定数量的消息。意味着首先假设 8 条消息正在处理,然后如果一条消息处理完成,那么在确认处理的消息后,下一条消息应该从主题中获取,同时注意在任何订阅者上都找不到重复的消息,并且每次 8 条消息都应该在后台处理。

为此,我使用了 max_messages = 8 的同步拉取方法,但在所有消息处理完成后才完成下一次拉取。因此,我们创建了自己的调度程序,同时 8 个进程应该在后台运行并一次拉取 1 条消息,但在所有 8 条消息处理完成后仍会传递下一条消息。

这是我的代码:

    #!/usr/bin/env python3

    import logging
    import multiprocessing
    import time
    import sys
    import random
    from google.cloud import pubsub_v1

    project_id = 'xyz'
    subscription_name = 'abc'

    NUM_MESSAGES = 4
    ACK_DEADLINE = 50
    SLEEP_TIME = 20

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        logger.info("Received message:{}".format(msg.message.data))
        random_sleep = random.randint(200,800)
        logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
        time.sleep(random_sleep)

    def message_puller():
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_name)
        while(True):
            try:
                response = subscriber.pull(subscription_path, max_messages=1)
                message = response.received_messages[0]
                msg = message
                ack_id = message.ack_id
                process = multiprocessing.Process(target=worker, args=(message,))
                process.start()
                while process.is_alive():
                    # `ack_deadline_seconds` must be between 10 to 600.
                    subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
                    time.sleep(SLEEP_TIME)
                # Final ack.
                subscriber.acknowledge(subscription_path, [ack_id])
                logger.info("Acknowledging message: {}".format(msg.message.data))
    except Exception as e:
        print (e)
        continue

    def synchronous_pull():
        p = []
        for i in range(0,NUM_MESSAGES):
            p.append(multiprocessing.Process(target=message_puller))

        for i in range(0,NUM_MESSAGES):
            p[i].start()

        for i in range(0,NUM_MESSAGES):
            p[i].join()

    if __name__ == '__main__':
        synchronous_pull()
Run Code Online (Sandbox Code Playgroud)

有时,即使 while 循环始终为 True,subscriber.pull 也不会提取任何消息。它给了我错误,因为列表索引 (0) 超出范围 结论是subscriber.pull 没有拉入消息,即使消息是关于主题的,但在一段时间后它开始拉动。为什么会这样?

我尝试过异步拉取和流量控制,但在多个订阅者上发现重复的消息。如果任何其他方法可以解决我的问题,请告诉我。提前致谢。

sai*_*ero 5

Google Cloud PubSub 确保至少一次( docs )。这意味着,消息可能会被多次传递。为了解决这个问题,你需要让你的程序/系统是幂等的

您有多个订阅者,每个订阅者提取 8 条消息。
为避免同一消息被多个订阅者处理,acknowledge一旦任何订阅者拉取该消息并进一步处理该消息,该消息就会在整个消息处理结束后立即进行处理,而不是在最后确认该消息。

此外,不要连续运行主脚本,而是sleep在队列中没有消息时使用一段时间。

我有一个类似的代码,在那里我使用了同步拉取,但我没有使用并行处理。

这是代码:

PubSubHandler - 处理 Pubsub 相关操作的类

from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded


class PubSubHandler:

    def __init__(self, subscriber_config):

        self.project_name = subscriber_config['PROJECT_NAME']
        self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']

        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)


    def pull_messages(self,number_of_messages):

        try:
            response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
            received_messages = response.received_messages
        except DeadlineExceeded as e:
            received_messages = []
            print('No messages caused error')
        return received_messages


    def ack_messages(self,message_ids):

        if len(message_ids) > 0:
            self.subscriber.acknowledge(self.subscriber_path, message_ids)
            return True

Run Code Online (Sandbox Code Playgroud)

Utils - util 方法的类

import json

class Utils:


    def __init__(self):
        pass


    def decoded_data_to_json(self,decoded_data):
        try:
            decoded_data = decoded_data.replace("'", '"')
            json_data = json.loads(decoded_data)
            return json_data
        except Exception as e:
            raise Exception('error while parsing json')


    def raw_data_to_utf(self,raw_data):
        try:
            decoded_data = raw_data.decode('utf8')
            return decoded_data
        except Exception as e:
            raise Exception('error converting to UTF')
Run Code Online (Sandbox Code Playgroud)

Orcestrator - 主脚本


import time
import json
import logging

from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler

class Orcestrator:

    def __init__(self):

        self.MAX_NUM_MESSAGES = 2
        self.SLEEP_TIME = 10
        self.util_methods = Utils()
        self.pub_sub_handler = PubSubHandler(subscriber_config)


    def main_handler(self):
        to_ack_ids = []
        pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)

        if len(pulled_messages) < 1:
            self.SLEEP_TIME = 1
            print('no messages in queue')
            return

        logging.info('messages in queue')
        self.SLEEP_TIME = 10

        for message in pulled_messages:
            raw_data = message.message.data
            try: 
                decoded_data = self.util_methods.raw_data_to_utf(raw_data)  
                json_data = self.util_methods.decoded_data_to_json(decoded_data)
                print(json_data)

            except Exception as e:
                logging.error(e)
            to_ack_ids.append(message.ack_id)

        if self.pub_sub_handler.ack_messages(to_ack_ids):
            print('acknowledged msg_ids')


if __name__ == "__main__":

    orecestrator = Orcestrator()
    print('Receiving data..')
    while True:
        orecestrator.main_handler()
        time.sleep(orecestrator.SLEEP_TIME)

Run Code Online (Sandbox Code Playgroud)