如何在python中限制ZMQ(ZeroMQ - PyZMQ)队列缓冲区大小?

Ben*_*ari 5 python sockets zeromq pyzmq

我正在使用pyzmq带有pub/sub模式的库。我有一些快速的 ZMQ 发布.connect()方法和一个较慢的 ZMQ 订阅.bind()方法。然后几分钟后,我的订阅者从发布者那里得到旧的发布数据(由于 ZMQ 缓冲区)。


我的问题:

有没有办法管理ZMQ 队列缓冲区大小?(设置有限的缓冲区)

[注意]:

  • 我不想使用 ZMQ 推/拉。
  • 我读过这篇文章,但这种方法只清除缓冲区:清除 ZMQ 缓冲区
  • 我也尝试过high watermark选项,但没有用:
socket.setsockopt(zmq.RCVHWM, 10)  # not working
socket.setsockopt(zmq.SNDHWM, 10)  # not working
Run Code Online (Sandbox Code Playgroud)

出版商:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
socket.setsockopt(zmq.SNDHWM, 10)  # not working

while True:
    data = time.time()
    print("%d" % data)
    socket.send("%d" % data)
    time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

订户:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.RCVHWM, 10)  # not working

while 1:
    time.sleep(2)  # A speed reducer like.
    data = socket.recv()
    print(data)
Run Code Online (Sandbox Code Playgroud)

即使使用这些选项,队列大小仍然超过 10(使用配置send/receive high watermark)。

Ben*_*ari 7

我找到了一种仅ZMQ订阅者中使用选项获取最后一条消息的方法CONFLATE

CONFLATE请注意,您应该在连接之前设置该选项:

import zmq
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.SUB)

socket.setsockopt(zmq.SUBSCRIBE, '')
socket.setsockopt(zmq.CONFLATE, 1)  # last msg only.
socket.connect("tcp://localhost:%s" % port)  # must be placed after above options.

while 1:
    time.sleep(2)  # Dummy delay
    data = socket.recv()
    print(data)
Run Code Online (Sandbox Code Playgroud)

换句话说,它删除订阅者端的所有缓冲队列。


[笔记]:

此外,通过使用zmq.SNDBUFzmq.RCVBUF选项,我们可以设置 ZMQ 缓冲区大小的限制。(更多信息


  • 您说“您应该在连接之后**设置 CONFLATE 选项”,但在代码片段中,您在连接之前**设置 CONFLATE 选项。我认为代码是正确的,句子应该是**before**。 (3认同)

jam*_*vey 0

要设置队列/缓冲区大小,您需要通过套接字选项设置高水位线

setsockopt(zmq.SNDHWM, 10)
setsockopt(zmq.RCVHWM, 10)
Run Code Online (Sandbox Code Playgroud)

  • 真的没有发生任何改变。 (2认同)