小编hay*_*ypo的帖子

py.test混合装置和异步协程

我正在使用py.test为python3代码构建一些测试。该代码使用aiopg(到Postgres的基于Asyncio的接口)访问Postgresql数据库。

我的主要期望:

  • 每个测试用例都应该可以访问新的asyncio事件循环。

  • 运行时间过长的测试将因超时异常而停止。

  • 每个测试用例都应该有权访问数据库连接。

  • 在编写测试用例时,我不想重复自己。

使用py.test固定装置,我可以很接近我想要的东西,但是在每个异步测试用例中,我仍然不得不重复自己一遍。

这是我的代码的样子:

@pytest.fixture(scope='function')
def tloop(request):
    # This fixture is responsible for getting a new event loop
    # for every test, and close it when the test ends.
    ...

def run_timeout(cor,loop,timeout=ASYNC_TEST_TIMEOUT):
    """
    Run a given coroutine with timeout.
    """
    task_with_timeout = asyncio.wait_for(cor,timeout)
    try:
        loop.run_until_complete(task_with_timeout)
    except futures.TimeoutError:
        # Timeout:
        raise ExceptAsyncTestTimeout()


@pytest.fixture(scope='module')
def clean_test_db(request):
    # Empty the test database.
    ...

@pytest.fixture(scope='function')
def udb(request,clean_test_db,tloop):
    # Obtain a connection to the database using aiopg
    # (That's why we …
Run Code Online (Sandbox Code Playgroud)

postgresql fixtures pytest python-decorators python-asyncio

4
推荐指数
1
解决办法
1849
查看次数