python:在 pytest 装置中运行异步例程的正确方法?

Vin*_*nce 5 python websocket pytest python-multithreading python-asyncio

下面的测试通过了,但我怀疑我是否正确使用了 asyncio:

  • 该代码混合了异步和线程
  • 测试通过但从未退出(可能是因为“loop.run_until_complete”永远不会结束)
import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(websocket_server())
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    # no idea how to stop the loop here
    thread.join()


@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"
Run Code Online (Sandbox Code Playgroud)

(注意:为了停止循环,我尝试了此处提出的解决方案(How to stop websocket server Created with websockets.serve()?),但这导致服务器无法启动)

jsb*_*eno 3

您需要在运行服务器的线程中添加一些其他代码来接收来自主线程的信号并自行关闭。

幸运的是,由于 asyncio 的性质,该控件可以构建在单独的函数中,而不会干扰实现服务器本身的函数。只有创建循环并调用服务器任务的函数必须在另一个任务中安排一些代码来检查来自其他线程的信号是否到达 - asyncio 将负责这两个任务依次运行。

跨线程通信的正确方法是使用队列 - 尽管在这种情况下,即使是模块级(全局)变量也可以工作。请注意,即使存在“异步队列” - 在这种情况下,我们希望将消息从一个线程发送到另一个线程,并且没有两个异步任务尝试并行读取它,因此我们使用“传统”多线程模块Queue中的类queue

另外,不相关,但我将启动 asyncio 循环的代码更改为新方式,使用asyncio.run,而不使用第一个具有 asyncio 功能的 Python 版本中所需的所有样板。

import asyncio
import threading
import pytest
import websockets
from queue import Queue, Empty

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

async def coordinate(q):
    server = asyncio.create_task(websocket_server())
    while True:
        await asyncio.sleep(0)  # this is necessary to allow the asyncio loop to switch tasks.
        try:
            q.get_nowait()
        except Empty:
            pass
        else:  # block will run whenever there is _any_ message in the queue.
            server.cancel()
            return
    server.cancel()


def _run_server(q):
    asyncio.run(coordinate(q))

@pytest.fixture
def run_server():
    command  = Queue()
    thread = threading.Thread(target=_run_server, args=(command,))
    thread.start()
    yield thread
    command.put("quit")
    thread.join()


@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

Run Code Online (Sandbox Code Playgroud)

第二种方法不需要服务器线程中的消息监视代码,只需从运行测试的线程中进行调用以取消服务器任务。Asyncio 在调用中对此有一个预置loop.call_soon_threadsafe- 我们只需要在原始线程中引用循环和服务器任务(这样我们就可以获取其.cancel方法) - 这可以通过模块级(全局)变量来完成。“run_server”函数不会返回,因此需要全局变量,因为它们的值一旦设置就可以在父线程中检查。否则,如果您由于全局状态而不想诉诸这些,则也可以使用线程队列将“循环”和“服务器”对象从子线程发布到固定代码。使用全局变量会阻止测试正确并行运行。

import asyncio
import threading
import pytest
import websockets

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def websocket_server():
    async with websockets.serve(echo, "localhost", 8765):
        await asyncio.Future()

def _run_server():
    global loop, server
    loop = asyncio.new_event_loop()
    server = loop.create_task(websocket_server())
    try:
        loop.run_until_complete(server)
    except asyncio.CancelledError:
        pass
    loop.close()

@pytest.fixture
def run_server():
    thread = threading.Thread(target=_run_server)
    thread.start()
    yield thread
    loop.call_soon_threadsafe(server.cancel)
    thread.join()


@pytest.mark.asyncio
async def test_websocket(run_server):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send("Hello!")
        response = await websocket.recv()
        assert response == "Hello!"

Run Code Online (Sandbox Code Playgroud)

这次我们需要显式引用 asyncio 循环对象本身,因此asyncio.run我们不调用 ,而是执行“create_loop”、“run_until_complete”调用。

(感谢您提供完整的、独立的、可执行的、最小的示例 - 没有它我就不会花时间解决这个问题)