小编Pra*_*nde的帖子

谷歌云pubsub python同步拉取

我有一个主题和一个订阅多个订阅者。我的应用场景是我想处理不同订阅者上的消息,一次处理特定数量的消息。意味着首先假设 8 条消息正在处理,然后如果一条消息处理完成,那么在确认处理的消息后,下一条消息应该从主题中获取,同时注意在任何订阅者上都找不到重复的消息,并且每次 8 条消息都应该在后台处理。

为此,我使用了 max_messages = 8 的同步拉取方法,但在所有消息处理完成后才完成下一次拉取。因此,我们创建了自己的调度程序,同时 8 个进程应该在后台运行并一次拉取 1 条消息,但在所有 8 条消息处理完成后仍会传递下一条消息。

这是我的代码:

    #!/usr/bin/env python3

    import logging
    import multiprocessing
    import time
    import sys
    import random
    from google.cloud import pubsub_v1

    project_id = 'xyz'
    subscription_name = 'abc'

    NUM_MESSAGES = 4
    ACK_DEADLINE = 50
    SLEEP_TIME = 20

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        logger.info("Received message:{}".format(msg.message.data))
        random_sleep = random.randint(200,800)
        logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
        time.sleep(random_sleep)

    def message_puller():
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_name)
        while(True):
            try:
                response = …
Run Code Online (Sandbox Code Playgroud)

publish-subscribe python-3.x google-cloud-platform google-cloud-pubsub

2
推荐指数
1
解决办法
1512
查看次数