我有一个带有异步 sqlalchemy 的异步 FastApi 应用程序,源代码(不会提供 schemas.py,因为没有必要):
from sqlalchemy import (
Column,
String,
)
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.decl_api import DeclarativeMeta
from app.config import settings
engine = create_async_engine(settings.DATABASE_URL)
Base: DeclarativeMeta = declarative_base()
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Titles(Base):
__tablename__ = "titles"
id = Column(String(100), primary_key=True)
title = Column(String(100), unique=True)
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
Run Code Online (Sandbox Code Playgroud)
import .database
from fastapi_utils.cbv import cbv
from …
Run Code Online (Sandbox Code Playgroud) 我有一个服务网络套接字的类,并听 PostgreSQL。使用 asyncpg,当我尝试使用 add_listener 时,出现错误:RuntimeWarning: coroutine was never awaited。如何异步/等待回调。我尝试添加“等待 self.listener”,但它不起作用。
有没有办法以另一种方式处理这个问题?
import asyncio
import http
import websockets
import asyncpg
class App(object):
def __init__(self, loop):
self.loop = loop
self.ws_list = []
self.conn = None
async def ws_handler(self, ws, path):
if self.conn is None:
self.conn = await asyncpg.connect(user='xxx', password='xxx', database='pgws', host='127.0.0.1')
await self.conn.add_listener('todo_updates', self.listener)
print('new socket!!!')
self.ws_list.append(ws)
while True:
await asyncio.sleep(1)
async def listener(self, conn, pid, channel, payload):
print(payload)
for ws in self.ws_list:
task = asyncio.create_task()
await ws.send(payload)
if __name__ == "__main__": …
Run Code Online (Sandbox Code Playgroud) 以下代码抛出“sqlalchemy.exc.CompileError:未使用的列名称:_id”。
User = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('score', Integer)
)
values = [
{'score': 2, '_id': 1},
{'score': 3, '_id': 3}
]
query = User.update().where(User.c.id == bindparam('_id')).values(score=bindparam('score'))
await db.execute_many(query, values)
Run Code Online (Sandbox Code Playgroud)
db
是数据库的一个实例。Database
。请注意,我必须使用名称“_id”,因为 SQLalchemy 说“id”是保留的。
除了单独更新每一行之外还有其他解决方案吗?
Recently, I am trying to use encode/databases packages with my postgres db. I have done all correctly. But when I go to run it with uvicorn, I see that it shows me that psycopg2 is not installed. I do not wan to use psycopg2. So, I restated all and run databases[postgresql]
instead of databases
only. I also installed asyncpg
. But I am still seeing the same err. I was also doing everything in a virtual environment. My code is …
我制作了一个小应用程序,用于SQLAlchemy
处理与 postgresql 数据库的连接。现在我想用asincio重写它。由于某种原因,当我运行它时,出现以下错误:
Traceback (most recent call last):
File "D:\Space\discord_count_bot\bot\bot\main.py", line 12, in <module>
dbConnection.init_connection(
File "D:\Space\discord_count_bot\bot\bot\db_hanler.py", line 78, in init_connection
engine = create_async_engine(connection_string, future=True, echo=True)
File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\ext\asyncio\engine.py", line 40, in create_async_engine
sync_engine = _create_engine(*arg, **kw)
File "<string>", line 2, in create_engine
File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\util\deprecations.py", line 298, in warned
return fn(*args, **kwargs)
File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\engine\create.py", line 560, in create_engine
dbapi = dialect_cls.dbapi(**dbapi_args)
File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\dialects\postgresql\psycopg2.py", line 782, in dbapi
import psycopg2
ModuleNotFoundError: No module named 'psycopg2'
Run Code Online (Sandbox Code Playgroud)
如果psycopg2
安装了,我得到
Traceback (most …
Run Code Online (Sandbox Code Playgroud) 我想使用 Asyncpg 在表中插入一些 json 数据(2 列: id 、 cluster_json )。我想使用“executemany”函数来加速插入过程。
我的代码:
async def main():
conn = await asyncpg.connect('postgresql://postgres:postgres@localhost:5432/postgres')
statement = '''INSERT INTO cluster(cluster_json) VALUES($1) '''
await conn.executemany(statement, [{"name":"John", "age":30, "car":null},
{"name":"John1", "age":31, "car":null}'])
await conn.close()
asyncio.get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
asyncpg.exceptions.DataError: invalid input in executemany() argument sequence element #0: expected a sequence, got dict
Run Code Online (Sandbox Code Playgroud)
我试图将字典作为 str 传递。也有一个错误。
错误消息很清楚,代码与文档中的代码非常相似,
期望我想要插入 json 数据。不幸的是,我没有看到我错过了什么。有人发现问题/帮助我吗?提前致谢。
我想做这样的事情:
async with app.pg_pool.acquire() as pg:
uid = await pg.execute('INSERT INTO users (created, keyed, key, email) '
'VALUES ($1, $2, $3, $4) RETURNING id',
time, time, key, rj['email'])['id']
Run Code Online (Sandbox Code Playgroud)
但是Connection.execute
除了状态之外似乎没有返回任何内容:
这个问题可以换句话说:RETURNING
使用 asyncpg 时如何获取语句的响应?
有没有办法像在 Hibernate 中一样使用 asyncpg 在 PostgreSQL 服务器中创建数据库,使用:
jdbc:mysql://localhost:3306/db_name?createDatabaseIfNotExist=true
Run Code Online (Sandbox Code Playgroud)
uri 中的标志。有没有一种方法可以在不连接数据库的情况下连接到服务器并执行sql语句?
我想在 PostgreSQL 中使用异步查询。我知道许多编程语言中都存在一些异步驱动程序,即。asyncg (Python)、vertx-postgress (Java) 等等。
我是否需要以某种方式配置 PostgreSQL 才能使用异步功能?中有一个“异步行为”部分postgesql.conf
。我是否需要取消注释并编辑这些值才能以异步方式最佳地使用 PostgreSQL?
在我使用 asyncpg 的 python 代码中,我将元组 ('PENDING',) 传递到 where-in 查询中,该查询记录为:
args=('TYPE_1', ('PENDING',))
query=SELECT * FROM actions where type = $1 AND status IN $2
Run Code Online (Sandbox Code Playgroud)
似乎 sql 查询最终应该是
SELECT * FROM actions where type = TYPE_1 AND status in ('PENDING',);
Run Code Online (Sandbox Code Playgroud)
但上面的代码导致:
asyncpg.exceptions.PostgresSyntaxError: syntax error at or near "$2"
Run Code Online (Sandbox Code Playgroud)
我认为这可能是因为元组中的尾随逗号,但我不知道如何摆脱它..
我正在尝试python:3-8.alpine
使用 python 模块构建基于 的图像asyncpg
。
这是我的 Dockerfile 的一部分:
FROM python:3.8-alpine
RUN apk add gcc
RUN apk add python3-dev
RUN pip3 install asyncpg
Run Code Online (Sandbox Code Playgroud)
我补充说gcc
,python3-dev
因为我认为我需要它们能够asyncpg
根据文档进行构建:https : //magicstack.github.io/asyncpg/current/installation.html(但我不确定,我想我应该无需构建此模块即可安装)
但是在构建图像时出现以下错误:
gcc -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -DTHREAD_STACK_SIZE=0x100000 -fPIC -I/usr/local/include/python3.8 -c asyncpg/pgproto/pgproto.c -o build/temp.linux-x86_64-3.8/asyncpg/pgproto/pgproto.o -O2 -fsigned-char -Wall -Wsign-compare -Wconversion
In file included from asyncpg/pgproto/pgproto.c:29:
/usr/local/include/python3.8/Python.h:11:10: fatal error: limits.h: No such file or directory
11 | #include <limits.h>
| ^~~~~~~~~~
compilation terminated.
error: …
Run Code Online (Sandbox Code Playgroud) asyncpg ×12
python ×10
postgresql ×6
sqlalchemy ×3
asynchronous ×2
database ×2
fastapi ×2
psycopg2 ×2
alpine-linux ×1
docker ×1
python-3.x ×1
sql ×1
uvicorn ×1