(py)zmq/PUB :是否可以立即调用 connect() 然后 send() 并且不会丢失消息?

Phi*_*ppe 3 python wsgi zeromq flask pyzmq

使用此代码,我总是丢失消息:

def publish(frontend_url, message):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect(frontend_url)
    socket.send(message)
Run Code Online (Sandbox Code Playgroud)

但是,如果我引入一个短睡眠(),我可以得到消息:

def publish(frontend_url, message):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect(frontend_url)
    time.sleep(0.1)  # wait for the connection to be established 
    socket.send(message)
Run Code Online (Sandbox Code Playgroud)

有没有办法确保在调用 connect() 和 send() 之间不休眠的情况下传递消息?

恐怕我无法预测睡眠时长(网络延迟等)

更新:

上下文:我想将数据更新从Flask REST 应用程序发布到消息代理(例如,在资源创建/更新/删除时)。

目前,消息代理是使用 0mq FORWARDER 设备起草的

我知道 0mq 旨在抽象 TCP 套接字和消息传递的复杂性。

在连接长期存在的情况下,我可以使用它。但是,当在 gunicorn 或 uwsgi 等应用程序容器中运行我的 Flask 应用程序时,我有 N 个工作进程,我不能指望连接和进程是长期存在的。

据我了解,我应该使用真正的消息代理(如 RabbitMQ)并使用同步客户端在那里发布消息。

Jas*_*son 5

您不能完全做到这一点,但可能有其他解决方案可以解决您的问题。

你为什么使用PUB/SUB套接字?pub/sub 的性质更适合长时间运行的套接字,通常你会bind()PUB套接字上并在套接字上连接SUB。你在这里所做的,旋转一个套接字来发送一条消息,大概是某种“服务器”,并不太适合这个PUB/SUB范式。

如果您改为选择REQor DEALERto REPor 的一些变体ROUTER,那么事情可能会更顺利。一个REQ套接字将保存一条消息,直到它的一对准备好接收它。如果您不是特别关心“服务器”的响应,那么您可以将其丢弃。

有什么特别的原因你不只是让套接字保持打开状态,而不是构建一个全新的上下文和套接字,并在每次想要发送消息时重新连接?我可以想到一些有限的场景,在这些场景中这可能是首选行为,但通常让套接字保持打开状态是一个更好的主意。如果您想坚持使用PUB/SUB,那么只需在您的应用程序开始时启动套接字,休眠一段涵盖任何合理延迟情况的安全时间段,然后开始发送您的消息,而不必担心每次都重新连接。如果您将这个套接字长时间保持打开状态而没有任何新消息,您可能需要使用心跳来确保连接保持打开状态。