ZeroRPC发布订阅

use*_*657 4 publish-subscribe zeromq zerorpc

我想在我的服务器之间建立一个基于事件的系统.例如,当包装我的数据库逻辑的服务器改变状态时,我希望它通知我的其他服务器.发布/订阅设计似乎是理想的,我听说过有关ZeroRPC的好消息.

有些人提到使用zerorpc流来完成pub/sub,但是对于我来说,使用流媒体触发事件的方式并不明显.

小智 10

在dotCloud,我们使用了大量的pub/sub trough zerorpc流.让我来描述一下我们这样做的方法.

综上所述

我们公开了一个用@ zerorpc.stream修饰的流方法.调用此方法时,会将gevent.queue添加到集合中.然后该方法将永远循环,产生到达队列的每个消息.当此方法终止时(因为客户端已断开连接),队列将从集合中删除.

要发布,只需在集合中注册的每个队列上发布要发布的消息.此时,您必须决定要对慢速消费者做什么(断开它们,将它们排队到一定限度和/或丢弃新消息).

使用zerorpc-python的实现示例:

订阅部分

class MyService(object):
    def __init__(self):
        self._subscribers = set()

    @zerorpc.stream
    def subscribe(self):
        try:
            queue = gevent.queue.Queue()
            self._subscribers.add(queue)
            for msg in queue:
                yield msg
        finally:
            self._subscribers.remove(queue)
Run Code Online (Sandbox Code Playgroud)

subscribe方法只是将事件队列添加到集合中.然后永远消耗队列,直到: - 队列由Stop​​Iteration消息结束(参见gevent.queue.Queue文档) - 运行subscribe函数的greenlet被终止(通常是因为客户端断开连接)

在这两种情况下,都会执行finally语句,并从订阅者列表中删除队列.

请注意,此时可以限制队列的大小:...Queue(maxsize=42).

出版部分

class MyService(object):
    [...]

    def _publish(self, msg):
        for queue in self._subscribers:
            if queue.size < 42:
                queue.put(msg)
Run Code Online (Sandbox Code Playgroud)

调用此方法以发布消息.它将遍历所有订户队列以将消息放入其中.在我的示例中,如果队列达到特定大小,我将丢弃该消息.但是你想在那里应用什么样的模式是没有限制的.

您可以将订阅者的greenlet实例存储在集合中,然后在队列已满时将其终止,从而有效地断开慢速客户端(您甚至可以尝试发送消息通知客户端太慢).您还可以等待所有消费者在从_publish等返回之前并行处理消息.天空是我朋友的极限!

希望有所帮助!

  • 我们需要ZeroRPC中的良好文档,目前它不存在. (8认同)