ZeroMQ PUB套接字在连接时缓冲所有输出数据

Fan*_*Lin 13 python zeromq pyzmq

我注意到,如果zeromq PUB套接字正在连接,它将缓冲所有传出数据

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()
Run Code Online (Sandbox Code Playgroud)

这些消息之后的子绑定应该被删除,因为如果没有人连接它,PUB应该丢弃消息.但它不是丢弃消息,而是缓冲所有消息.

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi
Run Code Online (Sandbox Code Playgroud)

正如您所看到的那样,那些"不应丢弃的消息"由套接字缓冲,一旦连接,它就会将它们刷新到SUB套接字.如果我在PUB套接字绑定,并在SUB套接字连接,那么它可以正常工作.

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())
Run Code Online (Sandbox Code Playgroud)

而你只能看到输出

'hi'
Run Code Online (Sandbox Code Playgroud)

这种奇怪的行为导致问题,它缓冲连接套接字上的所有数据,我有两台服务器,服务器A向服务器B发布数据

Server A -- publish --> Server B
Run Code Online (Sandbox Code Playgroud)

如果服务器B上线,它工作正常.但是,如果我启动服务器A并且不启动服务器B怎么办?

结果,服务器A上的连接PUB套接字保留所有这些数据,内存使用量越来越高.

这是问题,这种行为是一个错误或功能吗?如果是功能,我在哪里可以找到提及此行为的文档?如何停止连接PUB套接字缓冲所有数据?

谢谢.

acu*_*ich 6

套接字阻塞或丢弃消息取决于套接字类型,如ZMQ :: Socket文档中所述(以下重点是我的):

ZMQ :: HWM:获取高水位线

ZMQ :: HWM选项应检索指定套接字的高水位线.高水位标记是对于指定套接字正在与之通信的任何单个对等方,0MQ应在内存中排队的未完成消息的最大数量的硬限制.

如果已达到此限制,则套接字应进入异常状态,并且根据套接字类型,0MQ应采取适当的操作,例如阻止或丢弃已发送的消息.有关为每种套接字类型采取的确切操作的详细信息,请参阅ZMQ :: Socket中的各个套接字描述.

默认的ZMQ :: HWM值为零意味着"无限制".

你可以看它是否会阻止或通过的文件寻找插座类型掉落ZMQ::HWM option action其中任一会BlockDrop.

动作ZMQ::PUBDrop,所以如果它没有掉落你应该检查HWM(高水位标记)值并注意警告,默认ZMQ :: HWM值为零意味着"无限制",这意味着它不会进入异常状态,直到系统内存不足(此时我不知道它的行为).


Pet*_*ter 5

我觉得这种行为是 zmq_connect() 的语义。\n也就是说:当 zmq_connect() 返回成功时,连接在概念上就建立了,因此您的连接 PUB 开始排队消息而不是丢弃

\n\n

以下摘录自“ ZMQ 指南”是对此的提示:

\n\n
\n

理论上,对于 \xc3\x98MQ 套接字,哪一端连接以及哪一端绑定并不重要。但是,对于 PUB-SUB 套接字,如果绑定 SUB\n 套接字并连接 PUB 套接字,则 SUB 套接字可能会收到旧\n 消息,即在 SUB 启动之前发送的消息。这是绑定/连接工作方式的一个工件。如果可以的话,最好绑定 PUB 并连接 SUB。

\n
\n\n

zmq_connect ()中的以下部分有一些提示,如下所示:

\n\n
\n

与传统插座的主要区别

\n\n

一般来说,传统套接字为面向连接的可靠字节流 (SOCK_STREAM) 或无连接的不可靠数据报 (SOCK_DGRAM) 提供同步接口。相比之下,xc3x98MQ 套接字提供了一个抽象异步消息队列,具有精确的排队语义,具体取决于所使用的套接字类型。传统套接字传输字节流或离散数据报,而 \xc3\x98MQ 套接字传输离散消息。

\n\n

xc3\x98MQ 套接字是异步的,这意味着物理连接建立、断开、重新连接和有效传送的时间对用户来说是透明的,并由 xc3\x98MQ 本身组织。此外,如果对等方无法接收消息,则消息可能会排队。

\n
\n