如何在zmq的推/拉模式中设置hwm?

fly*_*yer 1 python zeromq pyzmq

我发现了一个类似的问题,ZeroMQ:PUSH上的HWM不起作用,但它无法解决我的问题.

我想控制推送套接字队列的消息数,但它不起作用,仍然排队1000条消息.
所以我想知道如何设置推插座的hwm.提前致谢.

我的环境是:libzmq 4.0.4,pyzmq 14.1.0,python 3.3

这是我的代码:

server.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import random

import zmq


class TestPush(object):

    def __init__(self):
        self.ctx = zmq.Context()

        random.seed()

    def run(self):
        task_snd = self.ctx.socket(zmq.PUSH)
        task_snd.setsockopt(zmq.SNDHWM, 10)
        task_snd.bind('tcp://*:53000')        

        while True:
            workload = str(random.randint(1, 100))
            task_snd.send(workload.encode('utf-8'))
            print('Send {0}'.format(workload))


if __name__ == '__main__':
    test_push = TestPush()
    test_push.run()
Run Code Online (Sandbox Code Playgroud)

client.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import random

import zmq


class TestPull(object):

    def __init__(self):
        self.ctx = zmq.Context()

    def run(self):
        task_rcv = self.ctx.socket(zmq.PULL)
        task_rcv.setsockopt(zmq.RCVHWM, 1)
        task_rcv.connect('tcp://localhost:53000')

        while True:
            msg = task_rcv.recv()
            print('Receive msg: {0}'.format(msg))

            time.sleep(random.randint(2, 3))


if __name__ == '__main__':
    test_pull = TestPull()
    test_pull.run()
Run Code Online (Sandbox Code Playgroud)

小智 11

当我试图在推拉式插座上设置HWM(高水位标记)时,我遇到了与ZeroMQ类似的问题.甚至,它不适用于pub和sub socket.我想解释一下发生了什么以及它是如何解决的.

我制作了2个脚本,首先是带有插座的发送器,另一个是带拉插座的接收器.在两个套接字上将HWM设置为10.在接收器脚本内部,我收到每条消息后延迟5秒.然后我用100个消息的循环运行发送者脚本(在保持接收器正在运行以接收之后).

我所期待的:

接收方队列然后发送方的队列将到达hwm.之后,发件人将停止发送更多邮件.

但是发生了什么:

发件人发送所有100条消息并退出.但是接收器一直保持处理消息很长一段时间,直到收到所有消息.

经过研究,我发现了原因:

有一种称为内核套接字缓冲区的东西位于发送方套接字和接收方套接字之间.每当进程打开一个套接字时,内核就会将内存空间分配给tcp套接字,默认情况下为128KB.内核套接字缓冲区适用于发送方和接收方套接字(因此总缓冲区将为128KB + 128KB).我的邮件大小以字节为单位(带有一些字符的int).因此,总消息缓冲将如下:

总缓冲区消息=发送方套接字hwm +发送方套接字的内核套接字缓冲区(128KB)+接收方套接字hwm +发送方套接字的内核套接字缓冲区(128KB)

解:

现在,我将消息长度改为1KB以上.然后再次执行测试,发现大约260条消息发送(如预期的那样),此后发送方停止间隔,直到接收方收到一些消息并再次启动.


附加信息

为了使推送套接字即使在接收器无法接收时也能继续发送消息,我们可以在发送例程中使用NOBLOCK选项,但接收器丢失的消息数量会增加很多.因此,更好的选择是使用轮询或超时,然后使用NOBLOCK选项调用send例程.

请注意,您可以使用zeromq的SNDBUFF/RCVBUFF,但OS可能不遵守它(因为在我的情况下它不起作用).