我有一个预先存在的mysql数据库,包含大约50个表.
而不是为每个表手动编写一个声明式样式SqlAlchemy类(如此处所示),是否有一个工具/脚本/命令我可以针对mysql数据库运行,它将为数据库中的每个表生成声明式样式的python类?
仅以一个表为例(将为所有50个理想情况生成),如下所示:
+---------+--------------------+
| dept_no | dept_name |
+---------+--------------------+
| d009 | Customer Service |
| d005 | Development |
| d002 | Finance |
| d003 | Human Resources |
| d001 | Marketing |
| d004 | Production |
| d006 | Quality Management |
| d008 | Research |
| d007 | Sales |
+---------+--------------------+
Run Code Online (Sandbox Code Playgroud)
是否有工具/脚本/命令可以生成包含以下内容的文本文件:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Department(Base):
__tablename__ = 'departments'
dept_no = Column(String(5), primary_key=True) …Run Code Online (Sandbox Code Playgroud) 我有一组我想要处理的.csv文件.使用SQL查询处理它会容易得多.我想知道是否有某种方法可以加载.csv文件并使用SQL语言来使用像python或ruby这样的脚本语言来查看它.使用类似于ActiveRecord的东西加载它会很棒.
问题是我不想在运行脚本之前在某处运行数据库.我不需要在脚本语言和一些模块之外进行额外的安装.
我的问题是我应该使用哪种语言和哪些模块来完成这项任务.我环顾四周,找不到任何适合我需要的东西.它甚至可能吗?
我有以下查询来检索单列数据:
routes_query = select(
[schema.stop_times.c.route_number],
schema.stop_times.c.stop_id == stop_id
).distinct(schema.stop_times.c.route_number)
result = conn.execute(routes_query)
return [r['route_number'] for r in result]
Run Code Online (Sandbox Code Playgroud)
我想知道是否有一种更清晰的方法来检索返回的数据行的本机列表.
我出于性能原因使用这个sql
sql_tmpl = """delete from Data where id_data in (:iddata) """
params = {
'iddata':[1, 2,3 4],
}
# session is a session object from sqlalchemy
self.session.execute(text(sql_tmpl), params)
Run Code Online (Sandbox Code Playgroud)
但是我得到了一个例外
NotSupportedError: (NotSupportedError) ('Python type list not supported. param=1', 'HY097')
Run Code Online (Sandbox Code Playgroud)
是否有任何解决方法可以允许我将列表绑定到'in'子句的参数?
我有一个表'test',其列'Name'没有约束.ALTER通过给它一个UNIQUE约束我需要这个专栏.我该怎么办?
我应该使用op.alter_column('???')或create_unique_constraint('???')?新列不是create_unique_constraint而不是现有列吗?
在Flask-SQLAlchemy教程中,定义了User模型的构造函数:
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)
def __init__(self, username, email):
self.username = username
self.email = email
Run Code Online (Sandbox Code Playgroud)
对于有两列的表,这可能是可以接受的,但如果我有10列以上的表怎么办?每次定义新模型时都必须定义构造函数吗?
dict的关键名称映射到sqlalchemy对象attrs
例如:
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
password = Column(String)
Run Code Online (Sandbox Code Playgroud)
可以从id = 3 {name: "diana"}或id = 15更新,{name: "marchel", fullname: "richie marchel"}
我已经搜索了很长时间,但无法找到问题的解决方案.我们将SQLAlchemy与MySQL结合用于我们的项目,我们遇到了几次可怕的错误:
1213,'试图锁定时发现死锁; 尝试重启事务'.
在这种情况下,我们想尝试最多重启三次交易.
我已经开始编写一个装饰器来执行此操作,但我不知道如何在失败之前保存会话状态并在之后重试相同的事务?(因为SQLAlchemy在引发异常时需要回滚)
我到目前为止的工作,
def retry_on_deadlock_decorator(func):
lock_messages_error = ['Deadlock found', 'Lock wait timeout exceeded']
@wraps(func)
def wrapper(*args, **kwargs):
attempt_count = 0
while attempt_count < settings.MAXIMUM_RETRY_ON_DEADLOCK:
try:
return func(*args, **kwargs)
except OperationalError as e:
if any(msg in e.message for msg in lock_messages_error) \
and attempt_count <= settings.MAXIMUM_RETRY_ON_DEADLOCK:
logger.error('Deadlock detected. Trying sql transaction once more. Attempts count: %s'
% (attempt_count + 1))
else:
raise
attempt_count += 1
return wrapper
Run Code Online (Sandbox Code Playgroud) 假设我有一个Postgres数据库(9.3)并且有一个名为的表Resources.在Resources表中,我有id一个int data类型的字段,它是一个JSON类型.
假设我在表中有以下记录.
我想要做的是编写一个查询,返回所有记录,其中数据列有一个json元素,其lastname等于"Doe"
我试着写这样的东西:
records = db_session.query(Resource).filter(Resources.data->>'lastname' == "Doe").all()
Run Code Online (Sandbox Code Playgroud)
然而,Pycharm在" - >>"上给出了编译错误
有谁知道如何编写filter子句来做我需要的东西?
我有一个反应器从RabbitMQ代理获取消息并触发工作方法在进程池中处理这些消息,如下所示:
这是使用python实现的asyncio,loop.run_in_executor()和concurrent.futures.ProcessPoolExecutor.
现在我想使用SQLAlchemy访问worker方法中的数据库.大多数情况下,处理将是非常简单和快速的CRUD操作.
反应器在开始时每秒处理10-50条消息,因此不能为每个请求打开新的数据库连接.相反,我想在每个进程中维护一个持久连接.
我的问题是:我怎么能这样做?我可以将它们存储在全局变量中吗?SQA连接池是否会为我处理这个问题?当反应堆停止时如何清理?
[更新]
为什么选择带有进程池的模式?
当前实现使用不同的模式,其中每个使用者在其自己的线程中运行.不知何故,这不是很好.已经有大约200个消费者在他们自己的线程中运行,并且系统正在快速增长.为了更好地扩展,我们的想法是分离关注点并在I/O循环中使用消息并将处理委托给池.当然,整个系统的性能主要是I/O绑定.但是,处理大型结果集时CPU是一个问题.
另一个原因是"易用性".虽然消息的连接处理和消耗是异步实现的,但是worker中的代码可以是同步且简单的.
很快,很明显,通过工作者内部的持久网络连接访问远程系统是一个问题.这就是CommunicationChannels的用途:在worker中,我可以通过这些通道向消息总线发出请求.
我目前的一个想法是以类似的方式处理数据库访问:将语句通过队列传递到事件循环,然后将它们发送到数据库.但是,我不知道如何使用SQLAlchemy执行此操作.入口点在哪里?对象需要pickled在它们通过队列时传递.如何从SQA查询中获取此类对象?与数据库的通信必须异步工作,以免阻塞事件循环.我可以使用例如aiomysql作为SQA的数据库驱动程序吗?
python sqlalchemy rabbitmq python-asyncio python-multiprocessing