zeromq:如何防止无限期等待?

Jes*_*ose 65 python zeromq

我刚刚开始使用ZMQ.我正在设计一个应用程序的工作流程为:

  1. 许多客户端之一(具有随机PULL地址)在5555向服务器发送请求
  2. 服务器永远等待客户端PUSHes.当一个人来时,会针对该特定请求生成一个工作进程.是的,工作进程可以同时存在.
  3. 当该过程完成它的任务时,它会将结果推送给客户端.

我假设PUSH/PULL架构适用于此.请纠正我.


但是我该如何处理这些情况呢?

  1. 当服务器无法响应时,client_receiver.recv()将等待无限时间.
  2. 客户端可以发送请求,但它会在之后立即失败,因此工作进程将永远停留在server_sender.send().

那么如何设置PUSH/PULL模型中的超时


编辑:感谢user938949的建议,我得到了一个有效的答案,我正在为后代分享它.

min*_*nrk 75

如果您使用的是zeromq> = 3.0,则可以设置RCVTIMEO套接字选项:

client_receiver.RCVTIMEO = 1000 # in milliseconds
Run Code Online (Sandbox Code Playgroud)

但一般来说,你可以使用pollers:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send
Run Code Online (Sandbox Code Playgroud)

poller.poll()暂停:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.
Run Code Online (Sandbox Code Playgroud)

evts 如果没有什么可以收到,将是一个空列表.

您可以轮询zmq.POLLOUT,以检查发送是否成功.

或者,为了处理可能失败的对等体的情况,a:

worker.send(msg, zmq.NOBLOCK)
Run Code Online (Sandbox Code Playgroud)

可能就足够了,它总会立即返回 - 如果发送无法完成则引发ZMQError(zmq.EAGAIN).

  • 正如@Adobri和@mknaf所说:如果使用`zmq.RCVTIMEO`,你还需要设置`zmq.LINGER`,否则即使超时后套接字仍然不会关闭.在Python中,它是`socket.setsockopt(zmq.RCVTIMEO,1000)``socket.setsockopt(zmq.LINGER,0)``message = socket.recv()` (3认同)

Jes*_*ose 15

在我提到user938949的答案和http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/后,这是我做的快速黑客攻击.如果你做得更好,请发表你的答案,我会建议你的答案.

对于那些希望持久解决可靠性问题的人,请参阅http://zguide.zeromq.org/page:all#toc64

zeromq 3.0版(beta ATM)支持ZMQ_RCVTIMEO和ZMQ_SNDTIMEO中的超时.http://api.zeromq.org/3-0:zmq-setsockopt

服务器

zmq.NOBLOCK确保当客户端不存在时,send()不会阻塞.

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"
Run Code Online (Sandbox Code Playgroud)

客户

轮询对象可以侦听在许多recieving插座(见"Python的多处理ZeroMQ"上面的联系.我联系,只在work_receiver.在无限循环,与1000毫秒的间隔客户端投票.该袜子对象返回空,如果没有消息已经在那个时候收到了.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"
Run Code Online (Sandbox Code Playgroud)


Ado*_*bri 9

如果您使用ZMQ_NOBLOCK,send将不会阻止,但如果您尝试关闭套接字和上下文,此步骤将阻止程序退出..

原因是套接字等待任何对等体,以确保传出消息排队.要立即关闭套接字并从缓冲区刷新传出消息,请使用ZMQ_LINGER并将其设置为0.

  • 如果你不使用zmq.LINGER,zmq.RCVTIMEO将无法帮助你,因为在超时后套接字仍然不会关闭.这应该添加到所选答案中. (2认同)

Mat*_*tin 5

如果您只是在等待一个套接字,而不是创建一个Poller,您可以这样做:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"
Run Code Online (Sandbox Code Playgroud)

如果您的超时根据情况发生变化,您可以使用它,而不是设置work_receiver.RCVTIMEO.