Phi*_*ppe 3 python wsgi zeromq flask pyzmq
使用此代码,我总是丢失消息:
def publish(frontend_url, message):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect(frontend_url)
socket.send(message)
Run Code Online (Sandbox Code Playgroud)
但是,如果我引入一个短睡眠(),我可以得到消息:
def publish(frontend_url, message):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect(frontend_url)
time.sleep(0.1) # wait for the connection to be established
socket.send(message)
Run Code Online (Sandbox Code Playgroud)
有没有办法确保在调用 connect() 和 send() 之间不休眠的情况下传递消息?
恐怕我无法预测睡眠时长(网络延迟等)
更新:
上下文:我想将数据更新从Flask REST 应用程序发布到消息代理(例如,在资源创建/更新/删除时)。
目前,消息代理是使用 0mq FORWARDER 设备起草的
我知道 0mq 旨在抽象 TCP 套接字和消息传递的复杂性。
在连接长期存在的情况下,我可以使用它。但是,当在 gunicorn 或 uwsgi 等应用程序容器中运行我的 Flask 应用程序时,我有 N 个工作进程,我不能指望连接和进程是长期存在的。
据我了解,我应该使用真正的消息代理(如 RabbitMQ)并使用同步客户端在那里发布消息。
您不能完全做到这一点,但可能有其他解决方案可以解决您的问题。
你为什么使用PUB/SUB套接字?pub/sub 的性质更适合长时间运行的套接字,通常你会bind()在PUB套接字上并在套接字上连接SUB。你在这里所做的,旋转一个套接字来发送一条消息,大概是某种“服务器”,并不太适合这个PUB/SUB范式。
如果您改为选择REQor DEALERto REPor 的一些变体ROUTER,那么事情可能会更顺利。一个REQ套接字将保存一条消息,直到它的一对准备好接收它。如果您不是特别关心“服务器”的响应,那么您可以将其丢弃。
有什么特别的原因你不只是让套接字保持打开状态,而不是构建一个全新的上下文和套接字,并在每次想要发送消息时重新连接?我可以想到一些有限的场景,在这些场景中这可能是首选行为,但通常让套接字保持打开状态是一个更好的主意。如果您想坚持使用PUB/SUB,那么只需在您的应用程序开始时启动套接字,休眠一段涵盖任何合理延迟情况的安全时间段,然后开始发送您的消息,而不必担心每次都重新连接。如果您将这个套接字长时间保持打开状态而没有任何新消息,您可能需要使用心跳来确保连接保持打开状态。
| 归档时间: |
|
| 查看次数: |
1503 次 |
| 最近记录: |