Fan*_*Lin 13 python zeromq pyzmq
我注意到,如果zeromq PUB套接字正在连接,它将缓冲所有传出数据
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print sub.recv()
Run Code Online (Sandbox Code Playgroud)
这些消息之后的子绑定应该被删除,因为如果没有人连接它,PUB应该丢弃消息.但它不是丢弃消息,而是缓冲所有消息.
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi
Run Code Online (Sandbox Code Playgroud)
正如您所看到的那样,那些"不应丢弃的消息"由套接字缓冲,一旦连接,它就会将它们刷新到SUB套接字.如果我在PUB套接字绑定,并在SUB套接字连接,那么它可以正常工作.
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print repr(sub.recv())
Run Code Online (Sandbox Code Playgroud)
而你只能看到输出
'hi'
Run Code Online (Sandbox Code Playgroud)
这种奇怪的行为导致问题,它缓冲连接套接字上的所有数据,我有两台服务器,服务器A向服务器B发布数据
Server A -- publish --> Server B
Run Code Online (Sandbox Code Playgroud)
如果服务器B上线,它工作正常.但是,如果我启动服务器A并且不启动服务器B怎么办?
结果,服务器A上的连接PUB套接字保留所有这些数据,内存使用量越来越高.
这是问题,这种行为是一个错误或功能吗?如果是功能,我在哪里可以找到提及此行为的文档?如何停止连接PUB套接字缓冲所有数据?
谢谢.
套接字阻塞或丢弃消息取决于套接字类型,如ZMQ :: Socket文档中所述(以下重点是我的):
ZMQ :: HWM:获取高水位线
ZMQ :: HWM选项应检索指定套接字的高水位线.高水位标记是对于指定套接字正在与之通信的任何单个对等方,0MQ应在内存中排队的未完成消息的最大数量的硬限制.
如果已达到此限制,则套接字应进入异常状态,并且根据套接字类型,0MQ应采取适当的操作,例如阻止或丢弃已发送的消息.有关为每种套接字类型采取的确切操作的详细信息,请参阅ZMQ :: Socket中的各个套接字描述.
默认的ZMQ :: HWM值为零意味着"无限制".
你可以看它是否会阻止或通过的文件寻找插座类型掉落ZMQ::HWM option action其中任一会Block或Drop.
动作ZMQ::PUB是Drop,所以如果它没有掉落你应该检查HWM(高水位标记)值并注意警告,默认ZMQ :: HWM值为零意味着"无限制",这意味着它不会进入异常状态,直到系统内存不足(此时我不知道它的行为).
我觉得这种行为是 zmq_connect() 的语义。\n也就是说:当 zmq_connect() 返回成功时,连接在概念上就建立了,因此您的连接 PUB 开始排队消息而不是丢弃。
\n\n以下摘录自“ ZMQ 指南”是对此的提示:
\n\n\n\n\n理论上,对于 \xc3\x98MQ 套接字,哪一端连接以及哪一端绑定并不重要。但是,对于 PUB-SUB 套接字,如果绑定 SUB\n 套接字并连接 PUB 套接字,则 SUB 套接字可能会收到旧\n 消息,即在 SUB 启动之前发送的消息。这是绑定/连接工作方式的一个工件。如果可以的话,最好绑定 PUB 并连接 SUB。
\n
zmq_connect ()中的以下部分有一些提示,如下所示:
\n\n\n\n与传统插座的主要区别
\n\n一般来说,传统套接字为面向连接的可靠字节流 (SOCK_STREAM) 或无连接的不可靠数据报 (SOCK_DGRAM) 提供同步接口。相比之下,xc3x98MQ 套接字提供了一个抽象异步消息队列,具有精确的排队语义,具体取决于所使用的套接字类型。传统套接字传输字节流或离散数据报,而 \xc3\x98MQ 套接字传输离散消息。
\n\nxc3\x98MQ 套接字是异步的,这意味着物理连接建立、断开、重新连接和有效传送的时间对用户来说是透明的,并由 xc3\x98MQ 本身组织。此外,如果对等方无法接收消息,则消息可能会排队。
\n