在 python asyncio 上发布订阅

Sat*_*urn 1 python websocket python-asyncio

我认为我的问题很简单也很愚蠢,但是我读了很多材料,无法想象如何做我想要的事情。

所以,我使用websockets库,并且我有这个算法:

# 1. get connection and start handle it
async def main_request_handler(ws, path):
    proxy = Proxy()
    try:
        await proxy.start(ws, path)
Run Code Online (Sandbox Code Playgroud)

2.在start内部我创建第二个websocket来传递请求ws并接收答案并将其发送到ws

while True:
    request_raw = await self.ws_server.recv()
    await self.process_request_from_server(request_raw)
Run Code Online (Sandbox Code Playgroud)

问题是,我需要为多个 ws客户端使用一个 websocket 服务器连接,并且我需要向每个人传递相同的答案ws_server。现在我只得到一个响应,因为 .recv() 仅返回其中一个“订阅者”的值。如何解决这个问题?请注意,我使用while Trueasync

Udi*_*Udi 5

这是一个非常简单的 pub/sub websockets 服务器示例

import asyncio
import websockets

connections = set()
n = 0


async def handler(websocket, path):
    global n

    if path == "/sub":
        n = n + 1
        i = n
        connections.add(websocket)
        print("adding subscriber #", i)
        try:
            async for msg in websocket:
                pass  # ignore
        except websockets.ConnectionClosed:
            pass
        finally:
            print("removing subscriber #", i)
            connections.remove(websocket)

    elif path == "/pub":
        async for msg in websocket:
            print("<", msg)
            for ws in connections:
                asyncio.ensure_future(ws.send(msg))


start_server = websockets.serve(handler, 'localhost', 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)

还有一个订阅者客户端示例(运行其中一些):

import asyncio
import websockets


async def listen():
    async with websockets.connect('ws://localhost:8765/sub') as websocket:
        while True:
            greeting = await websocket.recv()
            print("< {}".format(greeting))


asyncio.get_event_loop().run_until_complete(listen())
Run Code Online (Sandbox Code Playgroud)

和出版商:

import asyncio
import websockets


async def say():
    async with websockets.connect('ws://localhost:8765/pub') as websocket:
        while True:
            msg = input("Enter message:")
            if not msg:
                break
            await websocket.send(msg)


asyncio.get_event_loop().run_until_complete(say())
Run Code Online (Sandbox Code Playgroud)