什么是消费来自rabbitmq的消息并通过其客户端连接转发它们的"扭曲"方式?

Jai*_*gus 5 twisted rabbitmq websocket pika autobahn

我正在写一个websocket服务器twisted来学习框架.它将从rabbitmq代理接收消息,并向连接的客户端发送更新.如果我想通过许多客户端连接一次广播/多播多个消息,是调用(仅作为示例)deferToThread(channel.basic_consume, queue),还是callInThread(" ")这样做的一个非常好的选择?

如果没有,twisted消费消息rabbitmq并将其转发给连接的客户端的方式是什么?

到目前为止,我的策略是:

reactor_thread:侦听端口(x)以设置和维护客户端连接

other_thread:订阅rabbitmq队列并使用消息(如果有的话)(永远继续)

not*_*.no 4

调用(仅作为示例) deferToThread(channel.basic_consume, queue) 或 callInThread(" ") 是这样做的一个很好的选择吗?

在这种情况下,使用线程并不会真正提供太多好处,因为消息已经在 RabbitMQ 中排队。我过去也遇到过类似的情况,我可以向您提供我在不使用线程的情况下解决问题的方法的高级概述。免责声明:我已经有一两年没有使用 RabbitMQ 或 Websockets 了,所以我的知识可能有点模糊。

列出已连接的客户端

假设您使用的autobahn是 websocket,您可以在工厂类 ( autobahn.twisted.websocket.WebSocketServerFactory) 中添加一个变量,该变量将跟踪连接的客户端。要么list要么dict都会工作得很好。

factory = WebSocketServerFactory()
factory.connection_list = []
Run Code Online (Sandbox Code Playgroud)

连接建立后,该connection_list变量将存储协议对象 ( )。autobahn.twisted.websocket.WebSocketServerProtocol在协议中,您需要重载该connectionMade函数以将协议(self在本例中)附加到self.factory.connection_list.

def connectionMade(self):
    super(WSProtocol, self).connectionMade()
    self.factory.connection_list.append(self)
Run Code Online (Sandbox Code Playgroud)

为了灵活性,最好创建类似“onConnect deferred”的东西,但这就是它的要点。也许autobahn提供一个接口来做到这一点。

RabbitMQ

使用pika,您可以通过此示例异步消费消息。根据需要更改通道和交换名称,使其适合您的设置。然后我们将进行 2 项更改。首先,我们将factory.connection_list传递给回调,然后当消息被使用时,我们将其写入连接的客户端协议。

@defer.inlineCallbacks
def run(connection, proto_list):
    #...
    l = task.LoopingCall(read, queue_object, proto_list)
    l.start(0.01)

@defer.inlineCallbacks
def read(queue_object, proto_list):
    #...
    if body:
        print(body)
        for client in sorted(proto_list):
            yield client.write(body)

    yield ch.basic_ack(delivery_tag=method.delivery_tag)

#...
d.addCallback(run, factory.connection_list)
reactor.run()
Run Code Online (Sandbox Code Playgroud)

read回调函数中,每次消费消息时,循环任务都会迭代已连接客户端的列表并向它们发送消息。