ZMQ:多个订阅者的XPUB套接字上没有订阅消息(Last Value Caching模式)

ora*_*nge 7 python sockets proxy publish-subscribe zeromq

我实现了ZMQ的最后值缓存(LVC)示例(http://zguide.zeromq.org/php:chapter5#Last-Value-Caching),但无法让第二个订阅者在后端注册.

订户首次登机时,event[0] == b'\x01'条件得到满足并且缓存的值被发送,但第二个订户(同一主题)甚至没有注册(if backend in events:永远不会).其他一切都很好.数据从发布者传递给订阅者(全部).

这可能是什么原因?后端的连接方式是否正确?这种模式是否只适用于第一个订户?

更新

当我将第二个订阅者订阅到另一个主题时,我得到了正确的行为(即\x01订阅时).这似乎对第一个用户起作用.是ZeroMQ中的错误吗?

更新2

下面是一个最小的工作的例子,表明LVC模式工作(至少不是在这里实施的方式).

# subscriber.py
import zmq

def main():
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.connect("tcp://127.0.0.1:5558")

    # Subscribe to every single topic from publisher
    print 'subscribing (sub side)'
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic")

    poller = zmq.Poller()
    poller.register(sub, zmq.POLLIN)
    while True:
        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if sub in events:
            msg = sub.recv_multipart()
            topic, current = msg
            print 'received %s on topic %s' % (current, topic)

if __name__ == '__main__':
    main() 
Run Code Online (Sandbox Code Playgroud)

这里是经纪人(如示例中所示,但有更多的冗长和集成的发布者).

# broker.py
# from http://zguide.zeromq.org/py:lvcache
import zmq
import threading
import time


class Publisher(threading.Thread):
    def __init__(self):
        super(Publisher, self).__init__()

    def run(self):
        time.sleep(10)
        ctx = zmq.Context.instance()
        pub = ctx.socket(zmq.PUB)
        pub.connect("tcp://127.0.0.1:5557")

        cnt = 0
        while True:
            msg = 'hello %d' % cnt
            print 'publisher is publishing %s' % msg
            pub.send_multipart(['my-topic', msg])
            cnt += 1
            time.sleep(5)


def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.bind("tcp://*:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)


    # launch a publisher
    p = Publisher()
    p.daemon = True
    p.start()

    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            cache[topic] = current
            backend.send_multipart(msg)

        ### this is where it fails for the 2nd subscriber. 
        ### There's never even an event from the backend 
        ### in events when the 2nd subscriber is subscribing.

        # When we get a new subscription we pull data from the cache:
        if backend in events:
            print 'message from subscriber'
            event = backend.recv()
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == b'\x01':
                topic = event[1:]
                print ' => subscribe to %s' % topic
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])
            elif event[0] == b'\x00':
                topic = event[1:]
                print ' => unsubscribe from %s' % topic

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

运行此代码(1 x broker.py,2 x subscriber.py)表明第一个订阅者按预期(\x01和缓存查找)在代理处注册,但第二个订阅者未以相同方式注册.有趣的是,第二个用户连接到发布/订阅者频道,一段时间(10秒)后,两个订阅者都从发布者那里接收数据.

这很奇怪.也许我的一些图书馆已经过时了.这是我得到的:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> zmq.__version__
'14.1.1'

$ brew info zeromq
zeromq: stable 4.0.5 (bottled), HEAD
High-performance, asynchronous messaging library
http://www.zeromq.org/
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) *
  Poured from bottle
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb
==> Dependencies
Build: pkg-config ?
Optional: libpgm ?, libsodium ?
Run Code Online (Sandbox Code Playgroud)

更新3

也可以在zeromq 4.1.2pyzmq-14.7.0(有或没有安装libpgm和libsodium)中观察到这种行为.

更新4

另一个观察结果表明第一个订阅者以某种方式处理不同:第一个订阅者是唯一一个通过在其订阅主题之前从XPUBsocket(backend)以预期方式取消订阅的订阅者\x00.其他订阅者(我试过超过2个)在后端频道上保持静音(虽然接收消息).

更新5

我希望我不会陷入一个兔子洞,但我已经查看了czmq绑定并在C中运行了我的Python示例.结果是一样的,所以我猜这不是绑定的问题,而是libzmq.

我还验证了第二个订阅者正在发送订阅消息,事实上我可以在线上看到这个:

首先订阅:

0000  02 00 00 00 45 00 00 3f  98 be 40 00 40 06 00 00   ....E..? ..@.@...
0010  7f 00 00 01 7f 00 00 01  fa e5 15 b6 34 f0 51 c3   ........ ....4.Q.
0020  05 e4 8b 77 80 18 31 d4  fe 33 00 00 01 01 08 0a   ...w..1. .3......
0030  2a aa d1 d2 2a aa cd e9  00 09 01 6d 79 2d 74 6f   *...*... ...my-to
0040  70 69 63                                           pic              
Run Code Online (Sandbox Code Playgroud)

第二个订阅消息与差异(上面)标记和解释.在订阅帧中发送相同的数据.

                               identification
                               v
0000  02 00 00 00 45 00 00 3f  ed be 40 00 40 06 00 00   ....E..? ..@.@...
                             src port      sequence number
                                  v        v  v  v  v
0010  7f 00 00 01 7f 00 00 01  fa e6 15 b6 17 da 02 e7   ........ ........

Acknowledgement number   window scaling factor
      v  v  v  v           v
0020  71 4b 33 e6 80 18 31 d5  fe 33 00 00 01 01 08 0a   qK3...1. .3......

timestamp value  timestamp echo reply
            v           v  v   |<-------- data -------
0030  2a aa f8 2c 2a aa f4 45  00 09 01 6d 79 2d 74 6f   *..,*..E ...my-to

      ------>|
0040  70 69 63                                           pic              
Run Code Online (Sandbox Code Playgroud)

ora*_*nge 8

我找到了这个问题的解决方案,即使我从前到后阅读文档,我还没有看到它.关键是XPUB_VERBOSE.在后端初始化之后添加此行,一切正常

backend.setsockopt(zmq.XPUB_VERBOSE, True)
Run Code Online (Sandbox Code Playgroud)

以下是官方文档的摘录:

ZMQ_XPUB_VERBOSE:在XPUB套接字上提供所有订阅消息XPUB在新订阅和取消订阅上设置套接字行为.值0是默认值,仅将新订阅消息传递给上游.值1传递上游的所有订阅消息.

选项值类型int选项值单位0,1默认值0适用的套接字类型 ZMQ_XPUB

Pieter Hintjens 在他的博客中提供了更多相关信息.这是相关部分:

几个月前,我们ZMQ_XPUB_VERBOSEXPUB套接字添加了一个简洁的小选项() ,禁用了对重复订阅的过滤.这适用于任何数量的订户.我们使用如下:

void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (publisher, 1);
zsocket_bind (publisher, "tcp://*:6001");
Run Code Online (Sandbox Code Playgroud)

应更新LVC模式描述以反映此设置,否则此模式将不起作用.