pytest 与会话范围的固定装置和 asyncio 相关的问题

Uri*_*Uri 8 python semaphore pytest python-asyncio pytest-xdist

我有多个测试文件,每个文件都有一个异步夹具,如下所示:


@pytest.fixture(scope="module")
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(scope="module")
async def some_fixture():
    return await make_fixture()
Run Code Online (Sandbox Code Playgroud)

我正在使用 xdist 进行并行化。另外我有这个装饰器:

@toolz.curry
def throttle(limit, f):
    semaphore = asyncio.Semaphore(limit)

    @functools.wraps(f)
    async def wrapped(*args, **kwargs):
        async with semaphore:
            return await f(*args, **kwargs)

    return wrapped
Run Code Online (Sandbox Code Playgroud)

我有一个函数使用它:

@throttle(10)
def f():
    ...
Run Code Online (Sandbox Code Playgroud)

现在f正在从多个测试文件调用,并且我收到一个异常,告诉我无法使用不同事件循环中的信号量。

我尝试转向会话级事件循环装置:



@pytest.fixture(scope="session", autouse=True)
def event_loop(request):
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

Run Code Online (Sandbox Code Playgroud)

但这只给了我:

ScopeMismatch:您尝试使用“模块”范围请求对象访问“函数”范围固定装置“event_loop”,涉及工厂

是否有可能让 xdist + 异步装置 + 信号量一起工作?

Uri*_*Uri 5

最终使用以下方法让它工作conftest.py

import asyncio

import pytest


@pytest.fixture(scope="session")
def event_loop():
    return asyncio.get_event_loop()
Run Code Online (Sandbox Code Playgroud)