Pytest Alembic 使用异步迁移初始化数据库

sti*_*dcl 5 python sqlalchemy pytest alembic

现有的帖子没有给我提供有用的答案。

我正在尝试使用 Pytest 运行异步数据库测试(db 是带有 asyncpg 的 Postgres),并且我想使用 Alembic 迁移来初始化我的数据库,以便我可以同时验证它们是否正常工作。

我的第一次尝试是这样的:

@pytest.fixture(scope="session")
async def tables():
    """Initialize a database before the tests, and then tear it down again"""
    alembic_config: config.Config = config.Config('alembic.ini')
    command.upgrade(alembic_config, "head")
    yield
    command.downgrade(alembic_config, "base")
Run Code Online (Sandbox Code Playgroud)

它实际上根本没有做任何事情(迁移从未应用于数据库,未创建表)。

Alembic 的文档和 Pytest-Alembic 的文档都说异步迁移应该通过env如下配置来运行:

async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

asyncio.run(run_migrations_online())
Run Code Online (Sandbox Code Playgroud)

但这并不能解决问题(但是它确实适用于 pytest 之外的生产迁移)。

我偶然发现了一个名为 的库pytest-alembic,它为此提供了一些内置测试。

运行时pytest --test-alembic,出现以下异常:

将 Future 连接到不同的循环

GitHub 存储库上的一些评论pytest-asyncio表明以下固定装置可能会修复它:

async def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

asyncio.run(run_migrations_online())
Run Code Online (Sandbox Code Playgroud)

但事实并非如此(同样的例外仍然存在)。

接下来我尝试upgrade使用以下命令手动运行测试:

@pytest.fixture(scope="session")
def event_loop() -> Generator:
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()
Run Code Online (Sandbox Code Playgroud)

这给了我

alembic_runner.migrate_up_to("revision_tag_here")

venv/lib/python3.9/site-packages/pytest_alembic/runner.py:264:在 run_connection_task 中返回 asyncio.run(run(engine))

RuntimeError:无法从正在运行的事件循环调用 asyncio.run()

然而,这是 的内部调用pytest-alembic我不会调用asyncio.run()自己,因此我无法为此应用任何在线修复(try-catching 检查是否有现有的事件循环可供使用等)。我确信这与我自己 asyncio.run()在 中定义的无关alembic env,因为如果我添加一个断点 - 或者只是在其上方引发一个异常 - 该行实际上永远不会被执行

最后,我也尝试过nest-asyncio.apply(),它永远挂起。

还有一些博客文章建议使用此装置来初始化数据库表以进行测试:

async def test_migrations(alembic_runner):
    alembic_runner.migrate_up_to("revision_tag_here")
Run Code Online (Sandbox Code Playgroud)

它的目的是创建一个数据库来运行测试,但这不会通过迁移运行,所以这对我的情况没有帮助。

我觉得我已经尝试了所有方法并访问了每个文档页面,但到目前为止我还没有运气。运行异步迁移测试肯定不会这么困难吗?

如果需要任何额外信息,我很乐意提供。

LeR*_*Roi 9

我通过以下命令很容易地启动并运行它

env.py - 这里的主要思想是迁移可以同步运行

import asyncio
from logging.config import fileConfig

from alembic import context
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine

config = context.config

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

target_metadata = mymodel.Base.metadata


def run_migrations_online():
    connectable = context.config.attributes.get("connection", None)
    if connectable is None:
        connectable = AsyncEngine(
            engine_from_config(
                context.config.get_section(context.config.config_ini_section),
                prefix="sqlalchemy.",
                poolclass=pool.NullPool,
                future=True
            )
        )

    if isinstance(connectable, AsyncEngine):
        asyncio.run(run_async_migrations(connectable))
    else:
        do_run_migrations(connectable)


async def run_async_migrations(connectable):
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()


def do_run_migrations(connection):
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
    )
    with context.begin_transaction():
        context.run_migrations()


run_migrations_online()
Run Code Online (Sandbox Code Playgroud)

然后我添加了一个简单的数据库初始化脚本
init_db.py

from alembic import command
from alembic.config import Config
from sqlalchemy.ext.asyncio import create_async_engine

__config_path__ = "/path/to/alembic.ini"
__migration_path__ = "/path/to/folder/with/env.py"

cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migration_path__)


async def migrate_db(conn_url: str):
    async_engine = create_async_engine(conn_url, echo=True)
    async with async_engine.begin() as conn:
        await conn.run_sync(__execute_upgrade)


def __execute_upgrade(connection):
    cfg.attributes["connection"] = connection
    command.upgrade(cfg, "head")

Run Code Online (Sandbox Code Playgroud)

那么你的 pytest 装置可能看起来像这样
conftest.py

...

@pytest_asyncio.fixture(autouse=True)
async def migrate():
    await migrate_db(conn_url)
    yield

...
Run Code Online (Sandbox Code Playgroud)

注意:我不会将迁移固定装置限制在测试会话中,我倾向于在每次测试后删除并迁移。