sti*_*dcl 5 python sqlalchemy pytest alembic
现有的帖子没有给我提供有用的答案。
我正在尝试使用 Pytest 运行异步数据库测试(db 是带有 asyncpg 的 Postgres),并且我想使用 Alembic 迁移来初始化我的数据库,以便我可以同时验证它们是否正常工作。
我的第一次尝试是这样的:
@pytest.fixture(scope="session")
async def tables():
"""Initialize a database before the tests, and then tear it down again"""
alembic_config: config.Config = config.Config('alembic.ini')
command.upgrade(alembic_config, "head")
yield
command.downgrade(alembic_config, "base")
Run Code Online (Sandbox Code Playgroud)
它实际上根本没有做任何事情(迁移从未应用于数据库,未创建表)。
Alembic 的文档和 Pytest-Alembic 的文档都说异步迁移应该通过env
如下配置来运行:
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
asyncio.run(run_migrations_online())
Run Code Online (Sandbox Code Playgroud)
但这并不能解决问题(但是它确实适用于 pytest 之外的生产迁移)。
我偶然发现了一个名为 的库pytest-alembic
,它为此提供了一些内置测试。
运行时pytest --test-alembic
,出现以下异常:
将 Future 连接到不同的循环
GitHub 存储库上的一些评论pytest-asyncio
表明以下固定装置可能会修复它:
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
asyncio.run(run_migrations_online())
Run Code Online (Sandbox Code Playgroud)
但事实并非如此(同样的例外仍然存在)。
接下来我尝试upgrade
使用以下命令手动运行测试:
@pytest.fixture(scope="session")
def event_loop() -> Generator:
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
Run Code Online (Sandbox Code Playgroud)
这给了我
alembic_runner.migrate_up_to("revision_tag_here")
venv/lib/python3.9/site-packages/pytest_alembic/runner.py:264:在 run_connection_task 中返回 asyncio.run(run(engine))
RuntimeError:无法从正在运行的事件循环调用 asyncio.run()
然而,这是 的内部调用,pytest-alembic
我不会调用asyncio.run()
自己,因此我无法为此应用任何在线修复(try-catch
ing 检查是否有现有的事件循环可供使用等)。我确信这与我自己 asyncio.run()
在 中定义的无关alembic env
,因为如果我添加一个断点 - 或者只是在其上方引发一个异常 - 该行实际上永远不会被执行。
最后,我也尝试过nest-asyncio.apply()
,它永远挂起。
还有一些博客文章建议使用此装置来初始化数据库表以进行测试:
async def test_migrations(alembic_runner):
alembic_runner.migrate_up_to("revision_tag_here")
Run Code Online (Sandbox Code Playgroud)
它的目的是创建一个数据库来运行测试,但这不会通过迁移运行,所以这对我的情况没有帮助。
我觉得我已经尝试了所有方法并访问了每个文档页面,但到目前为止我还没有运气。运行异步迁移测试肯定不会这么困难吗?
如果需要任何额外信息,我很乐意提供。
我通过以下命令很容易地启动并运行它
env.py
- 这里的主要思想是迁移可以同步运行
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = mymodel.Base.metadata
def run_migrations_online():
connectable = context.config.attributes.get("connection", None)
if connectable is None:
connectable = AsyncEngine(
engine_from_config(
context.config.get_section(context.config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True
)
)
if isinstance(connectable, AsyncEngine):
asyncio.run(run_async_migrations(connectable))
else:
do_run_migrations(connectable)
async def run_async_migrations(connectable):
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
run_migrations_online()
Run Code Online (Sandbox Code Playgroud)
然后我添加了一个简单的数据库初始化脚本
init_db.py
from alembic import command
from alembic.config import Config
from sqlalchemy.ext.asyncio import create_async_engine
__config_path__ = "/path/to/alembic.ini"
__migration_path__ = "/path/to/folder/with/env.py"
cfg = Config(__config_path__)
cfg.set_main_option("script_location", __migration_path__)
async def migrate_db(conn_url: str):
async_engine = create_async_engine(conn_url, echo=True)
async with async_engine.begin() as conn:
await conn.run_sync(__execute_upgrade)
def __execute_upgrade(connection):
cfg.attributes["connection"] = connection
command.upgrade(cfg, "head")
Run Code Online (Sandbox Code Playgroud)
那么你的 pytest 装置可能看起来像这样
conftest.py
...
@pytest_asyncio.fixture(autouse=True)
async def migrate():
await migrate_db(conn_url)
yield
...
Run Code Online (Sandbox Code Playgroud)
注意:我不会将迁移固定装置限制在测试会话中,我倾向于在每次测试后删除并迁移。
归档时间: |
|
查看次数: |
6180 次 |
最近记录: |