相关疑难解决方法(0)

如何使zeromq PUB/SUB删除旧邮件而不是新邮件(用于实时订阅源)?

假设我有一台PUB服务器,它zmq_send()SUB客户端的实时消息.如果客户端很忙并且zmq_recv()消息不够快,则消息将在客户端(和/或服务器)中缓冲.

如果缓冲区变得太大(高水位线),则将丢弃NEW消息.对于实时消息,这与人们想要的相反.应该删除旧消息以使其成为新消息.

有办法做到这一点吗?

理想情况下,我希望SUB客户端的接收队列为空或仅包含最新消息.当收到新消息时,它将替换旧消息.(我想这里的问题是客户端会zmq_recv()在队列为空时阻塞,浪费时间这样做.)

那么实时饲料通常如何实施ZeroMQ呢?

zeromq

9
推荐指数
1
解决办法
1173
查看次数

ZMQ 删除旧消息

我正在尝试创建一个现实生活中的系统,其中订阅者需要根据发布者提供的实时数据执行操作。有时,PUB 和 SUB 会不同步(最多 10 秒),因为它们正在执行某些操作,而我总是需要发布者提供的最新数据,否则订阅者执行的操作将会偏离。

我正在尝试使用 SUB/PUB 方法,并且尝试设置 HWM 限制,但它似乎不起作用。我尝试过断开连接方法,但它给系统增加了额外的一秒延迟,并且我的系统 90% 的时间都是实时工作的,因此通过使用断开连接,整个系统会崩溃。

订阅者(我正在尝试通过 time.sleep() 对实际系统进行建模):

import time
import zmq
import random

context = zmq.Context()
consumer_receiver = context.socket(zmq.SUB)

consumer_receiver.set_hwm(0)
consumer_receiver.connect("tcp://127.0.0.1:5555")

consumer_receiver.subscribe(b'')


while 1:
    d=random.randint(0,10)

    work = consumer_receiver.recv_pyobj()
    # consumer_receiver.disconnect()
    print(work,"  :",d)
    time.sleep(d)
Run Code Online (Sandbox Code Playgroud)

出版商:

import time
import zmq

context = zmq.Context()
zmq_socket = context.socket(zmq.PUB)
zmq_socket.bind("tcp://127.0.0.1:5555")

for x in range(1000):

    # zmq_socket.send_string("", zmq.SNDMORE)
    zmq_socket.send_pyobj(x,zmq.NOBLOCK)
    time.sleep(1)
    print(x)
Run Code Online (Sandbox Code Playgroud)

python zeromq pyzmq

5
推荐指数
1
解决办法
1317
查看次数

标签 统计

zeromq ×2

python ×1

pyzmq ×1