Qui*_*oNa 11 python zeromq python-3.x pyzmq
我试图通过使用请求 - 回复模式通过zeromq获得一个python程序与另一个python程序进行通信.客户端程序应该向回复的服务器程序发送请求.
我有两台服务器,当一台服务器发生故障时,另一台服务器接管.当第一台服务器工作时,通信工作正常,但是,当第一台服务器发生故障时,当我向第二台服务器发出请求时,我看到错误:
zmp.error.ZMQError:无法在当前状态下完成操作
服务器1的代码:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5677")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
服务器代码2:
# Run the server
while True:
# Define the socket using the "Context"
sock = context.socket(zmq.REP)
sock.bind("tcp://127.0.0.1:5877")
data = sock.recv().decode("utf-8")
res = "Recvd"
sock.send(res.encode('utf-8'))
Run Code Online (Sandbox Code Playgroud)
客户代码:
# ZeroMQ Context For distributed Message amogst processes
context = zmq.Context()
sock_1 = context.socket(zmq.REQ)
sock_2 = context.socket(zmq.REQ)
sock_1.connect("tcp://127.0.0.1:5677")
sock_2.connect("tcp://127.0.0.1:5877")
try:
sock_1.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_1.setsockopt(zmq.RCVTIMEO, 1000)
socks_1.setsockopt(zmq.LINGER, 0)
data = socks_1.recv().decode('utf-8') #receive data from the main node
except:
try:
#when server one fails
sock_2.send(data.encode('utf-8'), zmq.NOBLOCK)
socks_2.setsockopt(zmq.RCVTIMEO, 1000)
socks_2.setsockopt(zmq.LINGER, 0)
data = socks_2.recv().decode('utf-8')
except Exception as e:
print(str(e))
Run Code Online (Sandbox Code Playgroud)
这种方法有什么问题?我该如何解决这个问题?
REQ/REP死锁风险!虽然ZeroMQ是一个强有力的框架,了解其内部结构是必要的稳健和可靠的分布式系统设计和原型。
仔细观察后,使用常见的REQ/REP正式通信模式可能会使(并且确实使)对方陷入相互僵局:一方期望另一方做一个永远不会完成的步骤,并且没有办法以脱离死锁状态。
接下来,故障转移系统必须能够承受其自身组件的任何冲突。因此,必须很好地设计分布式系统状态信号并尽可能避免对元素 FSA 设计/步进/阻塞的依赖,否则,故障安全行为只是一种错觉。
始终小心处理资源,不要将 ZeroMQ 智能信令/消息传递的组件视为任何类型的“一次性用品”,在学术示例中可能会容忍这样做,而不是在生产系统环境中。您仍然需要支付成本(时间、资源分配/取消分配/垃圾收集)。正如评论中所指出的,永远不要在没有适当控制的情况下创建/分配资源。while True: .socket(); .bind(); .send();原则上是严重错误的,并且使设计的其余部分恶化。
在服务器端,“接收”和“发送”对至关重要。我遇到了类似的问题,而 socket.send 被错过了。
def zmq_listen():
global counter
message = socket_.recv().decode("utf-8")
logger.info(f"[{counter}] Message: {message}")
request = json.loads(message)
request["msg_id"] = f"m{counter}"
ack = {"msg_id": request["msg_id"]}
socket_.send(json.dumps(ack).encode("utf-8"))
return request
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9418 次 |
| 最近记录: |