Python + ZMQ:在当前状态下无法完成操作

Qui*_*oNa 11 python zeromq python-3.x pyzmq

我试图通过使用请求 - 回复模式通过zeromq获得一个python程序与另一个python程序进行通信.客户端程序应该向回复的服务器程序发送请求.

我有两台服务器,当一台服务器发生故障时,另一台服务器接管.当第一台服务器工作时,通信工作正常,但是,当第一台服务器发生故障时,当我向第二台服务器发出请求时,我看到错误:

zmp.error.ZMQError:无法在当前状态下完成操作

服务器1的代码:

# Run the server
while True:

    # Define the socket using the "Context"
    sock = context.socket(zmq.REP)
    sock.bind("tcp://127.0.0.1:5677")
    data = sock.recv().decode("utf-8")
    res = "Recvd"
    sock.send(res.encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)

服务器代码2:

# Run the server
while True:

    # Define the socket using the "Context"
    sock = context.socket(zmq.REP)
    sock.bind("tcp://127.0.0.1:5877")
    data = sock.recv().decode("utf-8")
    res = "Recvd"
    sock.send(res.encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)

客户代码:

# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")

try:
    sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
    socks_1.setsockopt(zmq.RCVTIMEO, 1000)
    socks_1.setsockopt(zmq.LINGER, 0)
    data = socks_1.recv().decode('utf-8') #receive data from the main node  

except:
    try:
        #when server one fails
        sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
        socks_2.setsockopt(zmq.RCVTIMEO, 1000)
        socks_2.setsockopt(zmq.LINGER, 0)
        data = socks_2.recv().decode('utf-8')
    except Exception as e:
         print(str(e))
Run Code Online (Sandbox Code Playgroud)

这种方法有什么问题?我该如何解决这个问题?

use*_*197 9

问:我该如何解决这个问题?
A:避免已知的REQ/REP死锁风险!

虽然ZeroMQ是一个强有力的框架,了解其内部结构是必要的稳健可靠的分布式系统设计和原型。

仔细观察后,使用常见的REQ/REP正式通信模式可能会使(并且确实使)对方陷入相互僵局:一方期望另一方做一个永远不会完成的步骤,并且没有办法以脱离死锁状态。

有关更多说明细节FSA 示意图,请参阅此帖子

接下来,故障转移系统必须能够承受其自身组件的任何冲突。因此,必须很好地设计分布式系统状态信号并尽可能避免对元素 FSA 设计/步进/阻塞的依赖,否则,故障安全行为只是一种错觉。

始终小心处理资源,不要将 ZeroMQ 智能信令/消息传递的组件视为任何类型的“一次性用品”,在学术示例中可能会容忍这样做,而不是在生产系统环境中。您仍然需要支付成本(时间、资源分配/取消分配/垃圾收集)。正如评论中所指出的,永远不要在没有适当控制的情况下创建/分配资源。while True: .socket(); .bind(); .send();原则上是严重错误的,并且使设计的其余部分恶化。


aba*_*sar 5

在服务器端,“接收”和“发送”对至关重要。我遇到了类似的问题,而 socket.send 被错过了。

def zmq_listen():
    global counter
    message = socket_.recv().decode("utf-8")
    logger.info(f"[{counter}] Message: {message}")
    request = json.loads(message)
    request["msg_id"] = f"m{counter}"
    ack = {"msg_id": request["msg_id"]}
    socket_.send(json.dumps(ack).encode("utf-8"))
    return request
Run Code Online (Sandbox Code Playgroud)