如何在ZMQ中为(X)PUB /(X)SUB消息传递实现代理/代理?

myW*_*SON 10 c++ message-queue zeromq

所以我正在阅读这篇关于如何在ZMQ中为(X)PUB/(X)SUB消息传递创建代理/代理的文章.建筑物的外观很漂亮:

数据流:用户代码 - > PUB  - > XSUB  - >用户代码 - > XPUB  - > SUB  - >用户代码;  订阅流程:用户代码< -  PUB < -  XSUB < - 用户代码< -  XPUB < -  SUB < - 用户代码;

但是,当我查看XSUB套接字描述时,我不知道如何通过它转发所有订阅,因为它Outgoing routing strategyN/A

那么如何在ZeroMQ中实现(un)订阅转发,这种转发应用程序的最小用户代码是什么(可以插入简单的PublisherSubscriber样本之间)?

min*_*nrk 15

XPUB确实接收消息 - 它接收的唯一消息是来自连接订户的订阅,这些消息应该通过XSUB按原样向上游转发.

传递消息的最简单方法是使用zmq_proxy:

xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)
Run Code Online (Sandbox Code Playgroud)

它将向/从xpub和xsub中继消息.(可选)您可以添加PUB套接字以监控在任一方向上通过的流量.

如果你想让中间的用户代码实现额外的路由逻辑,你会做这样的事情,重新实现内部循环zmq_proxy:

def broker(ctx):
    xpub = ctx.socket(zmq.XPUB)
    xpub.bind(xpub_url)
    xsub = ctx.socket(zmq.XSUB)
    xsub.bind(xsub_url)

    poller = zmq.Poller()
    poller.register(xpub, zmq.POLLIN)
    poller.register(xsub, zmq.POLLIN)
    while True:
        events = dict(poller.poll(1000))
        if xpub in events:
            message = xpub.recv_multipart()
            print "[BROKER] subscription message: %r" % message[0]
            xsub.send_multipart(message)
        if xsub in events:
            message = xsub.recv_multipart()
            # print "publishing message: %r" % message
            xpub.send_multipart(message)

        # insert user code here
Run Code Online (Sandbox Code Playgroud)

完整工作(Python)的例子