创建一组彩带和发布者

dea*_*n44 6 python zeromq pyzmq

我有许多节点将使用辅助服务来通知彼此的地址。我希望能够发布信息,以便所有其他节点都能听到。XPUB我不想在这里使用套接字,因为我希望分发该系统。

我尝试过的东西可以总结为:

1创建一个PUB套接字,

def pub_stream(self):
     self.pub = self.context.socket(zmq.PUB)
     self.pub.bind(self.endpoint)
Run Code Online (Sandbox Code Playgroud)

2创建一个SUB流,

def sub_stream(self):
    ioloop = IOLoop.instance()
    socket = self.context.socket(zmq.SUB)
    self.sub_stream = ZMQStream(socket, ioloop)
    self.sub_stream.on_recv(self.on_message)
    self.subs_stream.setsockopt(zmq.SUBSCRIBE, self.topic)
Run Code Online (Sandbox Code Playgroud)

3在某个时候接收所有其他节点的地址并连接到它们,

# close and restart sub_stream to get rid of any previous connections
for endpoint in endpoints:
    self.sub_stream.connect(endpoint)
Run Code Online (Sandbox Code Playgroud)

on_message虽然没有消息通过回调。我正在做的事情正确吗?如果不是,我想实现的更好方法是什么?

jam*_*vey 0

无论您决定使用哪种路由,都需要至少一个固定地址来连接,除非您可以访问多播等。

我将有一个简单的 X(pub/sub) 代理作为我单独的发现网络,允许新节点根据主题决定哪些其他节点感兴趣。

简易版

  • 创建每端具有固定地址(DNS 等)的单个 XSUB/XPUB 代理
  • 节点启动
    • 连接到代理的XSUB端口并广播其主题、地址和数据端口
    • 连接到 XPUB 端口并订阅感兴趣的节点主题
      • 使用返回的连接信息,它根据连接信息将其数据套接字连接到节点的数据套接字。

可靠版本

  • 添加具有负载均衡器/虚拟 IP 的多个发现代理以涵盖容错等。
  • 节点应在计时器上发送发现消息
    • 允许晚加入的节点进行连接
    • 允许连接的节点发现失败的节点(除了依赖 tcp 超时)

混合动力版

在现实/生产中,我使用 Zeromq 以及任何其他可用的(更合适的)服务。这样我就不会尝试重新发明轮子(为了发现)而只是用 Zeromq 来进行订阅/数据工作。

例如,如果我使用云提供商将我的系统分布在许多区域,为什么不使用他们提供的发现服务。

AWS(作为一个混合示例)

对于 AWS,我使用以下组件来允许发现/可靠性/故障转移

  • 带健康检查的 Route53 (DNS)
  • 云地图(DNS 等)
  • 弹性负载均衡器(任何自定义发现服务的前端)