在 Python FastAPI 中使用 websockets 并行发送/接收

moh*_*ank 6 python websocket python-asyncio fastapi

我将尝试用一个例子来解释我在做什么,假设我正在构建一个天气客户端。浏览器通过 websocket 发送消息,例如:

{
  "city": "Chicago",
  "country": "US"
}
Run Code Online (Sandbox Code Playgroud)

服务器每 5 分钟查询一次天气,并使用最新数据更新浏览器。

现在浏览器可以发送另一条消息,例如:

{
  "city": "Bangalore",
  "country": "IN"
}
Run Code Online (Sandbox Code Playgroud)

现在我的服务器应该停止更新芝加哥的天气详细信息并开始更新有关班加罗尔的详细信息,即通过 websocket 同时发送/接收消息。我应该如何实施这个?

目前我有这个,但这只会在接收事件时更新浏览器:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    weather_client = WeatherClient(client)
    while True:
        data = await websocket.receive_json()
        weather = await weather_client.weather(data)
        await websocket.send_json(weather.dict())
Run Code Online (Sandbox Code Playgroud)

如果我移出websocket.receive_json()循环,我将无法连续收听来自浏览器的消息。我想我需要启动两个异步任务,但我不太能够确定实现,因为我是异步编程方式的新手。

Mat*_*ler 12

最简单的方法就是像您提到的将读取移到循环之外的单独任务中。在此范例中,您需要使用最新数据更新局部变量,使您的代码如下所示:

@app.websocket("/ws")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    json_data = await websocket.receive_json()

    async def read_from_socket(websocket: WebSocket):
        nonlocal json_data
        async for data in websocket.iter_json():
            json_data = data

    asyncio.create_task(read_from_socket(websocket))
    while True:
        print(f"getting weather data for {json_data}")
        await asyncio.sleep(1)  # simulate a slow call to the weather service
Run Code Online (Sandbox Code Playgroud)

请注意,我使用了iter_json异步生成器,这相当于receive_json.

这可以工作,但根据您的要求可能会出现错误。想象一下,天气服务需要 10 秒才能完成,在此期间用户通过套接字发送三个针对不同城市的请求。在上面的代码中,您只会获得用户发送的最新城市。这可能适合您的应用程序,但如果您需要跟踪用户发送的所有内容,则需要使用队列。在此范例中,您将有一个任务读取数据并将其放入队列,另一个任务从队列获取数据并查询天气服务。然后您将与 一起运行这些gather

@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        while True:
            if queue.empty():
                print(f"getting weather data for {data}")
                await asyncio.sleep(1)
            else:
                data = queue.get_nowait()
                print(f"Setting data to {data}")

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())
Run Code Online (Sandbox Code Playgroud)

这样,您就不会丢失用户发送的数据。在上面的示例中,我仅获取用户请求的最新天气数据,但您仍然可以访问发送的所有数据。

编辑:要回答您在评论中的问题,队列方法可能最好在新请求到来时取消任务。基本上将您希望能够取消的长时间运行的任务移动到它自己的协程函数中(在本例中)read_and_send_to_client并且将其作为任务运行。当新数据进来时,如果该任务尚未完成,请取消它,然后创建一个新任务。

async def read_and_send_to_client(data):
    print(f'reading {data} from client')
    await asyncio.sleep(10) # simulate a slow call
    print(f'finished reading {data}, sending to websocket client')


@app.websocket("/wsqueue")
async def read_webscoket(websocket: WebSocket):
    await websocket.accept()
    queue = asyncio.queues.Queue()

    async def read_from_socket(websocket: WebSocket):
        async for data in websocket.iter_json():
            print(f"putting {data} in the queue")
            queue.put_nowait(data)

    async def get_data_and_send():
        data = await queue.get()
        fetch_task = asyncio.create_task(read_and_send_to_client(data))
        while True:
            data = await queue.get()
            if not fetch_task.done():
                print(f'Got new data while task not complete, canceling.')
                fetch_task.cancel()
            fetch_task = asyncio.create_task(read_and_send_to_client(data))

    await asyncio.gather(read_from_socket(websocket), get_data_and_send())
Run Code Online (Sandbox Code Playgroud)