Mic*_*ton 2 python pytest python-asyncio
我正在尝试对异步套接字服务器进行单元测试,并使用pytest-asynciopytest 与异步代码库兼容。服务器一旦启动,总是会通过 while 循环发送回复,并且可能花费大部分时间等待 中的传入消息client_loop()。问题是在单元测试框架终止事件循环并发出此警告之前,无法取消此任务:
任务已被销毁,但正在等待处理!
任务: < 任务待处理 coro=< Server.new_client() 已完成,在 /[...path...]/server.py:16 处定义> wait_for=< 未来待处理 cb=[< 位于 0x106d7cbe8 的 TaskWakeupMethWrapper 对象>() ]>>
我有权访问的唯一任务是由 创建的任务asyncio.create_task(),这似乎不是同一个任务。该任务如下所示:
任务: < 任务挂起 coro=< start_server() 运行在 /usr/local/Cellar/python/[...不同路径...]/streams.py:86>>
所以调用task.cancel(); await task.wait_cancelled()这个任务是没有效果的。
如何编写这个单元测试以干净地启动并为每个测试启动服务器而不切断可能仍在运行的任务?
这是例子:
测试服务器.py
import pytest
import asyncio
@pytest.fixture
async def server(event_loop):
from server import Server
the_server = Server()
await the_server.start()
yield the_server
the_server.stop()
@pytest.mark.asyncio
async def test_connect(server):
loop = asyncio.get_event_loop()
reader, writer = await asyncio.open_connection('0.0.0.0', 8888, loop = loop)
writer.write(b'something')
await reader.read(100)
writer.write(b'something else')
await reader.read(100)
assert 1
Run Code Online (Sandbox Code Playgroud)
服务器.py
import asyncio
class Server():
async def start(self):
loop = asyncio.get_event_loop()
coro = asyncio.start_server(self.new_client, '0.0.0.0', 8888, loop = loop)
task = loop.create_task(coro)
print('\n')
print(task)
self.server = await task
def stop(self):
self.server.close()
async def new_client(self, reader, writer):
await self.client_loop(reader, writer)
async def client_loop(self, reader, writer):
while True:
await reader.read(100)
writer.write(b'reply')
Run Code Online (Sandbox Code Playgroud)
如果你想运行这个例子,只需运行pip3 install pytest-asynciopytest 就可以获取这个插件。
你必须await self.server.wait_closed()在打电话之后self.server.close()。
因此,你的装置应该如下所示:
@pytest.fixture
async def server(event_loop):
from server import Server
the_server = Server()
await the_server.start()
yield the_server
await the_server.stop()
Run Code Online (Sandbox Code Playgroud)
stop你的方法应该Server是这样的:
async def stop(self):
self.server.close()
await self.server.wait_closed()
Run Code Online (Sandbox Code Playgroud)
有关详细信息,请参阅文档。