我正在使用 async_engine。当我尝试执行任何操作时:
async with self.async_engine.connect() as con:
query = "SELECT id, name FROM item LIMIT 50;"
result = await con.execute(f"{query}")
Run Code Online (Sandbox Code Playgroud)
我越来越:
Exception has occurred: ObjectNotExecutableError
Not an executable object: 'SELECT id, name FROM item LIMIT 50;'
Run Code Online (Sandbox Code Playgroud)
这个问题之前由用户@stilmaniac提出过,但现在已从SO 中删除。
我在谷歌搜索缓存中找到了它,这里是副本。
我有同样的问题,所以我重新询问它,但原始版本如下:
我正在尝试从元数据创建表,如下所示:
Base = declarative_base()
properties = Table(
'properties', Base.metadata,
# ...
Column('geolocation', Geography(geometry_type='POINT', srid=4326)),
# ...
)
engine = create_async_engine("postgresql+asyncpg://user:password@postgres/")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Run Code Online (Sandbox Code Playgroud)
给我以下错误:
sqlalchemy.exc.ObjectNotExecutableError: Not an executable object: 'CREATE INDEX …
Run Code Online (Sandbox Code Playgroud) 我正在尝试解决以下错误:
\nasyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress\n
Run Code Online (Sandbox Code Playgroud)\n这是完整的回溯:
\nTraceback (most recent call last):\n\n File "<string>", line 1, in <module>\n File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main\n exitcode = _main(fd, parent_sentinel)\n \xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x94 4\n \xe2\x94\x82 \xe2\x94\x94 7\n \xe2\x94\x94 <function _main at 0x109c8aca0>\n File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/spawn.py", line 129, in _main\n return self._bootstrap(parent_sentinel)\n \xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x94 4\n \xe2\x94\x82 \xe2\x94\x94 <function BaseProcess._bootstrap at 0x109b1f8b0>\n \xe2\x94\x94 <SpawnProcess name='SpawnProcess-4' parent=36604 started>\n File "/Users/ddd/.pyenv/versions/3.8.6/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap\n self.run()\n \xe2\x94\x82 \xe2\x94\x94 <function …
Run Code Online (Sandbox Code Playgroud) import asyncio
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=True,
)
# expire_on_commit=False will prevent attributes from being expired
# after commit.
async_session = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
Base = …
Run Code Online (Sandbox Code Playgroud) 想象一下一个异步aiohttp
Web 应用程序,它由通过连接的 Postgresql 数据库支持asyncpg
,并且不执行其他 I/O。我怎样才能有一个托管应用程序逻辑的中间层,而不是异步的?(我知道我可以简单地使所有内容异步 - 但想象我的应用程序具有大量应用程序逻辑,仅受数据库 I/O 约束,并且我无法触及其中的所有内容)。
伪代码:
\nasync def handler(request):\n # call into layers over layers of application code, that simply emits SQL\n ...\n\ndef application_logic():\n ...\n # This doesn\'t work, obviously, as await is a syntax\n # error inside synchronous code.\n data = await asyncpg_conn.execute("SQL")\n ...\n # What I want is this:\n data = asyncpg_facade.execute("SQL")\n ...\n
Run Code Online (Sandbox Code Playgroud)\n如何asyncpg
构建同步 fa\xc3\xa7ade 来允许应用程序逻辑进行数据库调用?async.run()
在这种情况下,诸如 using或等浮动的配方asyncio.run_coroutine_threadsafe()
不起作用,因为我们来自已经异步的上下文。我认为这不可能是不可能的,因为已经有一个事件循环原则上可以运行协 …
除了我的 python bot 之外,我还使用 Heroku 设置了一个 postgresql 服务器,它也在 heroku 上运行,但该 bot 无法连接到数据库
我确保密码用户名等正确。
这是用于连接的方法:
async def create_db_pool():
bot.pg_con = await asyncpg.create_pool(database="dbname",
user="username",
password="dbpw")
Run Code Online (Sandbox Code Playgroud)
这就是我运行它的方式:
bot.loop.run_until_complete(create_db_pool())
Run Code Online (Sandbox Code Playgroud)
预计访问数据库并写入和读取数据,而不是我收到以下错误:
asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
Task was destroyed but it is pending!
task: <Task pending coro=<chng_pr() running at I:/Python/HardCoreDisBot/Commands.py:38> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000002571E9B1978>()]>>
Run Code Online (Sandbox Code Playgroud) 我最近将使用 FastApi 编码的 REST API 迁移到新的 SQLAlchemy 1.4+ 异步版本。我的应用程序编译正确,数据库似乎设置得很好。当我尝试执行多个请求时,会出现问题,出现错误,似乎表明同一会话正在用于我的所有请求。我把错误信息放在最后
这是我的代码,我基于 SQLAlchemy 异步文档和此示例
应用程序引擎初始化
from typing import AsyncIterator
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from .notification import Notification
from .devices import Device
from sqlalchemy import MetaData
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import create_engine
from app.core.config import Config
from app.core.constants import logger
import asyncio
engine = create_async_engine(Config.RDS_DB_URL)
metadata = MetaData(engine)
#if not engine.dialect.has_table(engine, C.NOTIFICATIONS):
#Base.metadata.create_all(engine)
async def init_connection():
async with engine.begin() as conn:
await …
Run Code Online (Sandbox Code Playgroud) 我正在使用nsidnev/fastapi-realworld-example-app。
我需要将事务逻辑应用到这个项目中。
在一个 API 中,我从存储库调用许多方法,并在许多表中执行更新、插入和删除操作。如果这些操作出现异常,我该如何回滚更改?(或者如果一切正确则提交。)
我正在使用 sqlalchemy + asyncpg 和“选择”急切加载。
我有与朋友有一对多关系的个人项目。
我将一个人插入到我的数据库中,但没有相关的朋友条目。如果在同一个会话中我尝试从数据库中获取该人,我可以很好地访问他们的静态(非关系)列,但无法访问关系friends
。
我认为尝试访问person.friends
会触发延迟加载,尽管它之前是作为selectin
加载强制执行的。为什么是这样?我怎样才能避免它?
# Create the ORM model
class Person(Base):
__tablename__ = 'items'
id_ = Column(POSTGRES_UUID(as_uuid=True), primary_key=True)
name = Column(String(32))
friends = relationship('Friend', lazy='selectin')
# Create an instance
person_id = uuid4()
person = Person(id_=person_id, name='Alice') # Note that this Person's friends are not set
# Add to database
async with AsyncSession(engine, expire_on_commit=False) as session:
try:
session.begin()
session.add(person)
await session.commit()
except:
await session.rollback()
raise
# Get the added person from …
Run Code Online (Sandbox Code Playgroud) 我正在从 SQLAlchemy 迁移到 SQLAlchemy[async],使用 postgresql 作为我的数据库。我正在关注 SQL Alchemy 文档https://docs.sqlalchemy.org/en/14/_modules/examples/asyncio/async_orm.html
对代码进行所有更改后,我的测试变得非常慢
我添加了一堆日志以便打印丢失时间的位置。我也进行了分析,但 cProfile 没有显示任何相关内容。
仅在插入后我才在代码中出现这种延迟。在我分享的测试中,我先执行插入,然后执行检索,并且它仅在会话与操作一起使用后发生add
。
engine = create_async_engine(DATABASE_URL,
pool_size=20,
max_overflow=2,
pool_recycle=300,
pool_pre_ping=True,
pool_use_lifo=True)
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
@asynccontextmanager
async def session_scope() -> AsyncSession:
"""
Context manager for API DB Sessions
"""
try:
async with async_session() as session:
await session.begin()
logger.info('Yielding session')
yield session
logger.info('Returned session')
logger.info('Pre commit')
await session.commit()
logger.info('Commit finished')
except Exception as exc: # pylint: disable=W0703
logger.exception('Exception on session')
await session.rollback()
raise exc
Run Code Online (Sandbox Code Playgroud)
这是我的客户
async …
Run Code Online (Sandbox Code Playgroud) 我想测试一些与asyncpg
. 如果我一次运行一个测试,效果很好。但是,如果我一次运行多个测试,则除第一个测试之外的所有测试都会因错误而崩溃asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
。
测试:
\n@pytest.mark.asyncio\nasync def test_project_connection(superuser_id, project_id):\n data = element_data_random(project_id)\n\n element_id = (await resolve_element_create(data=data, user_id=superuser_id))["id"]\n project_elements = (await db_projects_element_ids_get([project_id]))[project_id]\n\n assert element_id in project_elements\n\n\n@pytest.mark.asyncio\nasync def test_project_does_not_exist(superuser_id):\n data = element_data_random(str(uuid.uuid4()))\n\n with pytest.raises(ObjectWithIdDoesNotExistError):\n await resolve_element_create(data=data, user_id=superuser_id)\n
Run Code Online (Sandbox Code Playgroud)\n使用数据库使用池的所有函数如下所示:
\nasync def <some_db_func>(*args):\n pool = await get_pool()\n\n await pool.execute(...) # or fetch/fetchrow/fetchval\n
Run Code Online (Sandbox Code Playgroud)\n我如何获得池:
\ndb_pool = None\n\n\nasync def get_pool():\n global db_pool\n\n async def init(con):\n await con.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog')\n …
Run Code Online (Sandbox Code Playgroud) asyncpg ×10
python ×7
sqlalchemy ×5
fastapi ×4
asynchronous ×3
python-3.x ×2
aiohttp ×1
async-await ×1
database ×1
heroku ×1
postgresql ×1
pytest ×1