试图了解zeromq高水印行为

The*_*One 5 python multithreading zeromq pyzmq

我一直在玩pyzmq和使用HWM进行简单的负载平衡,但我不太了解所看到的行为。

我建立了一个简单的多线程测试,将DEALER客户端通过ROUTER to DEALER模式连接到两个工作程序。HWM设置为1。其中一个工作进程非常快,另一个工作进程非常慢,客户端所做的只是向服务器发送100条垃圾邮件。通常看来,这是可行的,并且速度较快的工作者比速度较慢的工作者处理更多的消息。

但是,即使我将慢工作程序设置为如此慢,以至于快工作程序应该能够在慢工作程序完成一个消息之前就处理99条消息,但慢工作程序似乎仍会接收至少2或3条消息。

高水位标记的行为不准确还是我缺少了什么?

服务器代码如下:

import re, sys, time, string, zmq, threading, signal


def worker_routine(worker_url, worker_id, context=None):
    # socket to talk to dispatcher
    context = context or zmq.Context.instance()
    socket = context.socket(zmq.REP)
    socket.set_hwm(1)
    socket.connect(worker_url)

    print "worker ", worker_id, " ready ..."
    while True:
        x = socket.recv()

        if worker_id==1:
            time.sleep(3)

        print worker_id, x
        sys.stdout.flush()

        socket.send(b'world')


context = zmq.Context().instance()
# socket facing clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# socket facing services
backend  = context.socket(zmq.DEALER)
url_worker = "inproc://workers"
backend.set_hwm(1)
backend.bind(url_worker)

# launch pool of worker threads
for i in range(2):
    thread = threading.Thread(target=worker_routine, args=(url_worker,i,))
    thread.start()
    time.sleep(0.1)

try:
    zmq.device(zmq.QUEUE, frontend, backend)
except:
    print "terminating!"

# we never get here
frontend.close()
backend.close()
context.term()
Run Code Online (Sandbox Code Playgroud)

客户端代码如下:

import zmq, random, string, time, threading, signal

#  prepare our context and sockets
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5559")

inputs = [''.join(random.choice(string.ascii_lowercase) for x in range(12)) for y in range(100)]

for x in xrange(100):
    socket.send_multipart([b'', str(x)])

print "finished!"
Run Code Online (Sandbox Code Playgroud)

输出示例:

...
0 81
0 82
0 83
0 84
0 85
0 86
0 87
0 88
0 89
0 90
0 91
0 92
0 93
0 94
0 95
0 96
0 97
0 98
0 99
1 1
1 3
1 5
Run Code Online (Sandbox Code Playgroud)

aaa*_*210 4

显然 ZeroMQ 从您的 send() 调用异步发送消息。也就是说,当send()返回时,消息还没有被发送,或者添加到内部队列中。如果发送速度足够快,下次调用send时,消息仍然没有被添加到队列中,因此还没有达到水印。您可能会在某些消息进入队列之前添加数十或数百条消息,达到水位线,并且阻塞发送行为开始。

换句话说,尝试在 send() 之后休眠一小会儿,看看会发生什么,它应该有足够的时间将消息添加到队列中,因此到下一次发送时,它能够看到已达到水位线。