Python ZMQ响应器未收到消息

GöC*_*öCo 5 python sockets networking zeromq pyzmq

我正在尝试一个简单的zmq脚本,但不知何故响应者没有收到第一条消息。

响应者看起来像这样:

def main():
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:{}".format(5560))
    print("connected ...")
    while True:
          #  Wait for next request from client
          message = socket.recv_pyobj()
          #print (message)
          print(message)
if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

我正在从另一个进程发送带有以下代码的请求:

def main():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:{}".format(5560))
    print("sending object")
    socket.send_pyobj("ok")
    print("done")

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

有人知道为什么它没有到达吗?

use*_*197 -1

有人知道为什么它没有到达吗?

哦,当然我有。

有几个要点决定了基于 ZeroMQ 的分布式计算系统如何工作。

如果您不熟悉使用 ZeroMQ 或其其他衍生产品(等人),请务必不要错过 Pieter Hintjen 的必读书籍“Code Connected. Volume 1”。

有两个地方可能会丢失(或主要是无法送达)消息:

  • 尚未准备好进程,假设接收消息,是第一个
  • 成功时没有可用的资源(端口).bind(),是第二个

与恶劣网络传输条件相关的问题不适用于localhost仅(vmci://内部端口抽象网络的虚拟化)实验


可治愈:

def main():
    context = zmq.Context()
    socket = context.socket( zmq.REQ )
    socket.setsockopt(       zmq.LINGER,    0 )              # ALWAYS PREVENT BLOCKING
    socket.setsockopt(       zmq.IMMEDIATE, 1 )              # BLOCK UNTIL CONN-READY
    #ocket.setsockpt(        zmq.ZMQ_HANDSHAKE_IVL, ... )    # IF TWEAKING NETWORK-WIDE
    # OR:
    # a stone-age wait for the other part get started in a one-computer demo:
    # sleep( 20 )
    # :o)
    socket.connect(         "tcp://localhost:{}".format( 5560 ) )
    print(                  "Will try to dispatch an object to Context() instance" )
    socket.send_pyobj(      "ok" )
    print(                  ".send() method has returned from a blocking-call mode" )
    ...
    #--------------------------------------------------------# ALWAYS
    socket.close()                                           # ALWAYS RELEASE RESOURCES
    context.term()                                           # ALWAYS RELEASE RESOURCES
    #                                                        # ALWAYS (not all versions
    #                                                        #         have "friendly"
    #                                                        #         defeaults to rely
    #                                                        #         on others,
    #                                                        #         so be explicit)
    #--------------------------------------------------------# ALWAYS
Run Code Online (Sandbox Code Playgroud)

一侧,显然不一定是REP,但这里它更适合,因为while必须.bind(),其他仅.connect()适合已知的连接目标:

def main():
    context = zmq.Context()
    socket = context.socket( zmq.REP )
    socket.setsockopt(       zmq.LINGER,    0 )              # ALWAYS PREVENT BLOCKING
    socket.setsockopt(       zmq.IMMEDIATE, 1 )              # BLOCK UNTIL CONN-READY
    #ocket.setsockpt(        zmq.ZMQ_HANDSHAKE_IVL, ... )    # IF TWEAKING NETWORK-WIDE
    socket.bind(            "tcp://localhost:{}".format( 5560 ) )
    print(                  ".bind() ...")
    try:
        while True:             # Wait for next request from client:
              message = socket.recv_pyobj()
              print( message )
    except:
        print( "EXC'd" )
    finally:
        #----------------------------------------------------# ALWAYS
        socket.unbind( "tcp://localhost:{}".format( 5560 ) ) # ALWAYS RELEASE PORT
        socket.close()                                       # ALWAYS RELEASE RESOURCES
        context.term()                                       # ALWAYS RELEASE RESOURCES
        #                                                    # ALWAYS (not all versions
        #                                                    #         have "friendly"
        #                                                    #         defeaults to rely
        #                                                    #         on others,
        #                                                    #         so be explicit)
        #----------------------------------------------------# ALWAYS
Run Code Online (Sandbox Code Playgroud)

最后但并非最不重要的一点是,这将开始起作用,但由于可能错过了行为原型的原则,它将陷入无限的等待REQ/REP。一个必须提出问题(REQ.send()-s),另一个负责答复的人必须倾听问题REP.recv(),但它也必须回答……REP.send("something")然后我们才能以两步探戈的方式向前迈进,而提问者已经也为了得到听过的答案REQ.recv()

然后,只有这样,提问者才能由另一个提问者发送另一个问题REQ.send()

因此,无限循环内的发送REQ部分(但主要是接收部分)都必须进行修改,以便接收任何第二条和更多消息,即使是在单次射击后死亡并且从不听任何答案的情况下也是如此从。REPwhile True:{...}REQREP