Pra*_*nde 2 publish-subscribe python-3.x google-cloud-platform google-cloud-pubsub
我有一个主题和一个订阅多个订阅者。我的应用场景是我想处理不同订阅者上的消息,一次处理特定数量的消息。意味着首先假设 8 条消息正在处理,然后如果一条消息处理完成,那么在确认处理的消息后,下一条消息应该从主题中获取,同时注意在任何订阅者上都找不到重复的消息,并且每次 8 条消息都应该在后台处理。
为此,我使用了 max_messages = 8 的同步拉取方法,但在所有消息处理完成后才完成下一次拉取。因此,我们创建了自己的调度程序,同时 8 个进程应该在后台运行并一次拉取 1 条消息,但在所有 8 条消息处理完成后仍会传递下一条消息。
这是我的代码:
#!/usr/bin/env python3
import logging
import multiprocessing
import time
import sys
import random
from google.cloud import pubsub_v1
project_id = 'xyz'
subscription_name = 'abc'
NUM_MESSAGES = 4
ACK_DEADLINE = 50
SLEEP_TIME = 20
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
def worker(msg):
logger.info("Received message:{}".format(msg.message.data))
random_sleep = random.randint(200,800)
logger.info("Received message:{} for {} sec".format(msg.message.data, random_sleep))
time.sleep(random_sleep)
def message_puller():
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_name)
while(True):
try:
response = subscriber.pull(subscription_path, max_messages=1)
message = response.received_messages[0]
msg = message
ack_id = message.ack_id
process = multiprocessing.Process(target=worker, args=(message,))
process.start()
while process.is_alive():
# `ack_deadline_seconds` must be between 10 to 600.
subscriber.modify_ack_deadline(subscription_path,[ack_id],ack_deadline_seconds=ACK_DEADLINE)
time.sleep(SLEEP_TIME)
# Final ack.
subscriber.acknowledge(subscription_path, [ack_id])
logger.info("Acknowledging message: {}".format(msg.message.data))
except Exception as e:
print (e)
continue
def synchronous_pull():
p = []
for i in range(0,NUM_MESSAGES):
p.append(multiprocessing.Process(target=message_puller))
for i in range(0,NUM_MESSAGES):
p[i].start()
for i in range(0,NUM_MESSAGES):
p[i].join()
if __name__ == '__main__':
synchronous_pull()
Run Code Online (Sandbox Code Playgroud)
有时,即使 while 循环始终为 True,subscriber.pull 也不会提取任何消息。它给了我错误,因为列表索引 (0) 超出范围 结论是subscriber.pull 没有拉入消息,即使消息是关于主题的,但在一段时间后它开始拉动。为什么会这样?
我尝试过异步拉取和流量控制,但在多个订阅者上发现重复的消息。如果任何其他方法可以解决我的问题,请告诉我。提前致谢。
Google Cloud PubSub 确保至少一次( docs )。这意味着,消息可能会被多次传递。为了解决这个问题,你需要让你的程序/系统是幂等的
您有多个订阅者,每个订阅者提取 8 条消息。
为避免同一消息被多个订阅者处理,acknowledge一旦任何订阅者拉取该消息并进一步处理该消息,该消息就会在整个消息处理结束后立即进行处理,而不是在最后确认该消息。
此外,不要连续运行主脚本,而是sleep在队列中没有消息时使用一段时间。
我有一个类似的代码,在那里我使用了同步拉取,但我没有使用并行处理。
这是代码:
PubSubHandler - 处理 Pubsub 相关操作的类
from google.cloud import pubsub_v1
from google.api_core.exceptions import DeadlineExceeded
class PubSubHandler:
def __init__(self, subscriber_config):
self.project_name = subscriber_config['PROJECT_NAME']
self.subscriber_name = subscriber_config['SUBSCRIBER_NAME']
self.subscriber = pubsub_v1.SubscriberClient()
self.subscriber_path = self.subscriber.subscription_path(self.project_name,self.subscriber_name)
def pull_messages(self,number_of_messages):
try:
response = self.subscriber.pull(self.subscriber_path, max_messages = number_of_messages)
received_messages = response.received_messages
except DeadlineExceeded as e:
received_messages = []
print('No messages caused error')
return received_messages
def ack_messages(self,message_ids):
if len(message_ids) > 0:
self.subscriber.acknowledge(self.subscriber_path, message_ids)
return True
Run Code Online (Sandbox Code Playgroud)
Utils - util 方法的类
import json
class Utils:
def __init__(self):
pass
def decoded_data_to_json(self,decoded_data):
try:
decoded_data = decoded_data.replace("'", '"')
json_data = json.loads(decoded_data)
return json_data
except Exception as e:
raise Exception('error while parsing json')
def raw_data_to_utf(self,raw_data):
try:
decoded_data = raw_data.decode('utf8')
return decoded_data
except Exception as e:
raise Exception('error converting to UTF')
Run Code Online (Sandbox Code Playgroud)
Orcestrator - 主脚本
import time
import json
import logging
from utils import Utils
from db_connection import DbHandler
from pub_sub_handler import PubSubHandler
class Orcestrator:
def __init__(self):
self.MAX_NUM_MESSAGES = 2
self.SLEEP_TIME = 10
self.util_methods = Utils()
self.pub_sub_handler = PubSubHandler(subscriber_config)
def main_handler(self):
to_ack_ids = []
pulled_messages = self.pub_sub_handler.pull_messages(self.MAX_NUM_MESSAGES)
if len(pulled_messages) < 1:
self.SLEEP_TIME = 1
print('no messages in queue')
return
logging.info('messages in queue')
self.SLEEP_TIME = 10
for message in pulled_messages:
raw_data = message.message.data
try:
decoded_data = self.util_methods.raw_data_to_utf(raw_data)
json_data = self.util_methods.decoded_data_to_json(decoded_data)
print(json_data)
except Exception as e:
logging.error(e)
to_ack_ids.append(message.ack_id)
if self.pub_sub_handler.ack_messages(to_ack_ids):
print('acknowledged msg_ids')
if __name__ == "__main__":
orecestrator = Orcestrator()
print('Receiving data..')
while True:
orecestrator.main_handler()
time.sleep(orecestrator.SLEEP_TIME)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1512 次 |
| 最近记录: |