Ben*_*ari 5 python sockets zeromq pyzmq
我正在使用pyzmq带有pub/sub模式的库。我有一些快速的 ZMQ 发布者.connect()方法和一个较慢的 ZMQ 订阅者.bind()方法。然后几分钟后,我的订阅者从发布者那里得到旧的发布数据(由于 ZMQ 缓冲区)。
有没有办法管理ZMQ 队列缓冲区大小?(设置有限的缓冲区)
[注意]:
high watermark选项,但没有用:Run Code Online (Sandbox Code Playgroud)socket.setsockopt(zmq.RCVHWM, 10) # not working socket.setsockopt(zmq.SNDHWM, 10) # not working
出版商:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10) # not working
while True:
data = time.time()
print("%d" % data)
socket.send("%d" % data)
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
订户:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10) # not working
while 1:
time.sleep(2) # A speed reducer like.
data = socket.recv()
print(data)
Run Code Online (Sandbox Code Playgroud)
即使使用这些选项,队列大小仍然超过 10(使用配置send/receive high watermark)。
我找到了一种仅在ZMQ订阅者中使用选项获取最后一条消息的方法CONFLATE。
CONFLATE请注意,您应该在连接之前设置该选项:
import zmq
import time
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1) # last msg only.
socket.connect("tcp://localhost:%s" % port) # must be placed after above options.
while 1:
time.sleep(2) # Dummy delay
data = socket.recv()
print(data)
Run Code Online (Sandbox Code Playgroud)
换句话说,它删除订阅者端的所有缓冲队列。
[笔记]:
此外,通过使用zmq.SNDBUF和zmq.RCVBUF选项,我们可以设置 ZMQ 缓冲区大小的限制。(更多信息)
要设置队列/缓冲区大小,您需要通过套接字选项设置高水位线
setsockopt(zmq.SNDHWM, 10)
setsockopt(zmq.RCVHWM, 10)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6058 次 |
| 最近记录: |