myW*_*SON 10 c++ message-queue zeromq
所以我正在阅读这篇关于如何在ZMQ中为(X)PUB/(X)SUB消息传递创建代理/代理的文章.建筑物的外观很漂亮:

但是,当我查看XSUB套接字描述时,我不知道如何通过它转发所有订阅,因为它Outgoing routing strategy是 N/A
那么如何在ZeroMQ中实现(un)订阅转发,这种转发应用程序的最小用户代码是什么(可以插入简单的Publisher和Subscriber样本之间)?
min*_*nrk 15
XPUB确实接收消息 - 它接收的唯一消息是来自连接订户的订阅,这些消息应该通过XSUB按原样向上游转发.
传递消息的最简单方法是使用zmq_proxy:
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)
Run Code Online (Sandbox Code Playgroud)
它将向/从xpub和xsub中继消息.(可选)您可以添加PUB套接字以监控在任一方向上通过的流量.
如果你想让中间的用户代码实现额外的路由逻辑,你会做这样的事情,重新实现内部循环zmq_proxy:
def broker(ctx):
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
poller = zmq.Poller()
poller.register(xpub, zmq.POLLIN)
poller.register(xsub, zmq.POLLIN)
while True:
events = dict(poller.poll(1000))
if xpub in events:
message = xpub.recv_multipart()
print "[BROKER] subscription message: %r" % message[0]
xsub.send_multipart(message)
if xsub in events:
message = xsub.recv_multipart()
# print "publishing message: %r" % message
xpub.send_multipart(message)
# insert user code here
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6987 次 |
| 最近记录: |