我想设置一个基于SQLAlchemy模型中另一个表的列默认值.
目前我有这个:
Column('version', Integer, default=1)
Run Code Online (Sandbox Code Playgroud)
我需要的是(大致)这个:
Column('version', Integer, default="SELECT MAX(1, MAX(old_versions)) FROM version_table")
Run Code Online (Sandbox Code Playgroud)
我如何在SQLAlchemy中实现它?
我的应用程序使用范围会话和SQLALchemy的声明式样式.它是一个Web应用程序,许多数据库插入由Celery任务调度程序执行.
通常,在决定插入对象时,我的代码可能会执行以下操作:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
Run Code Online (Sandbox Code Playgroud)
这里的问题是,由于很多,这是通过异步工人完成的,它是可能的一个工作是,虽然中途插入Bike有id=123,而另一个正在检查它的存在.在这种情况下,第二个worker将尝试插入一个具有相同主键的行,SQLAlchemy将引发一个IntegrityError.
我不能为我的生活找到一个很好的方法来处理这个问题,除了交换Session.commit():
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in …Run Code Online (Sandbox Code Playgroud) 我试图设置一个postgresql表,它有两个指向另一个表中相同主键的外键.
当我运行脚本时,我收到错误
sqlalchemy.exc.AmbiguousForeignKeysError:无法确定关系Company.stakeholder上的父/子表之间的连接条件 - 有多个链接表的外键路径.指定'foreign_keys'参数,提供应列为包含对父表的外键引用的列的列表.
这是SQLAlchemy文档中的确切错误,但当我复制它们提供的解决方案时,错误不会消失.我能做错什么?
#The business case here is that a company can be a stakeholder in another company.
class Company(Base):
__tablename__ = 'company'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
class Stakeholder(Base):
__tablename__ = 'stakeholder'
id = Column(Integer, primary_key=True)
company_id = Column(Integer, ForeignKey('company.id'), nullable=False)
stakeholder_id = Column(Integer, ForeignKey('company.id'), nullable=False)
company = relationship("Company", foreign_keys='company_id')
stakeholder = relationship("Company", foreign_keys='stakeholder_id')
Run Code Online (Sandbox Code Playgroud)
我在这里看到过类似的问题,但有些答案建议人们primaryjoin在文档中使用a 表明你primaryjoin在这种情况下不需要.
我有要检索的ID序列.这很简单:
session.query(Record).filter(Record.id.in_(seq)).all()
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法呢?
我希望我的模型的主键是一个自动递增的整数.这是我的模型的样子
class Region(db.Model):
__tablename__ = 'regions'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(100))
parent_id = db.Column(db.Integer, db.ForeignKey('regions.id'))
parent = db.relationship('Region', remote_side=id, primaryjoin=('Region.parent_id==Region.id'), backref='sub-regions')
created_at = db.Column(db.DateTime, default=db.func.now())
deleted_at = db.Column(db.DateTime)
Run Code Online (Sandbox Code Playgroud)
上面的代码创建了我的表,但没有id自动增量.因此,如果在我的插入查询中我错过了该id字段,它会给我这个错误
错误:列"id"中的空值违反非空约束
所以我把id声明改成了这样的样子
id = db.Column(db.Integer, db.Sequence('seq_reg_id', start=1, increment=1),
primary_key=True)
Run Code Online (Sandbox Code Playgroud)
还是一样的错误.上面的代码有什么问题?
我正在尝试使用uWSGI + Nginx设置应用程序webserver,它使用SQLAlchemy运行Flask应用程序与Postgres数据库进行通信.
当我向Web服务器发出请求时,每个其他响应都将是500错误.
错误是:
Traceback (most recent call last):
File "/var/env/argos/lib/python3.3/site-packages/sqlalchemy/engine/base.py", line 867, in _execute_context
context)
File "/var/env/argos/lib/python3.3/site-packages/sqlalchemy/engine/default.py", line 388, in do_execute
cursor.execute(statement, parameters)
psycopg2.OperationalError: SSL error: decryption failed or bad record mac
The above exception was the direct cause of the following exception:
sqlalchemy.exc.OperationalError: (OperationalError) SSL error: decryption failed or bad record mac
Run Code Online (Sandbox Code Playgroud)
该错误由一个简单的Flask-SQLAlchemy方法触发:
result = models.Event.query.get(id)
Run Code Online (Sandbox Code Playgroud)
uwsgi正在管理supervisor,有一个配置:
[program:my_app]
command=/usr/bin/uwsgi --ini /etc/uwsgi/apps-enabled/myapp.ini --catch-exceptions
directory=/path/to/my/app
stopsignal=QUIT
autostart=true
autorestart=true
Run Code Online (Sandbox Code Playgroud)
和uwsgi配置看起来像:
[uwsgi]
socket = …Run Code Online (Sandbox Code Playgroud) 我们生产中的一个线程遇到了一个错误,现在正在产生InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.错误,每次请求都有一个查询它服务的余生!现在已经好几天了!这怎么可能,我们如何防止它向前发展?
我们在uWSGI上使用Flask应用程序(4个进程,2个线程),Flask-SQLAlchemy为我们提供了与SQL Server的数据库连接.
当我们的一个生产线程正在拆除它的请求时,问题似乎开始了,在这个Flask-SQLAlchemy方法中:
@teardown
def shutdown_session(response_or_exc):
if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
if response_or_exc is None:
self.session.commit()
self.session.remove()
return response_or_exc
Run Code Online (Sandbox Code Playgroud)
...... self.session.commit()当事务无效时,以某种方式设法调用.这导致sqlalchemy.exc.InvalidRequestError: Can't reconnect until invalid transaction is rolled back输出到stdout,无视我们的日志记录配置,这是有道理的,因为它发生在应用程序上下文拆除期间,这从来不应该引发异常.我不确定如果没有response_or_exec设置,交易如何变得无效,但这实际上是AFAIK的较小问题.
更大的问题是,当"准备好的'状态"错误开始时,并且从那时起就没有停止过.每次这个线程提供命中数据库的请求时,它就是500s.每个其他线程似乎都没问题:据我所知,即使是在同一进程中的线程也没问题.
SQLAlchemy邮件列表中有一个关于"'准备好的'状态"错误的条目,说明如果会话开始提交但尚未完成,而其他东西试图使用它.我的猜测是,这个帖子中的会话永远不会self.session.remove()迈出这一步,现在它永远不会.
我仍然觉得这并没有解释这个会话如何在请求中持续存在.我们还没有修改Flask-SQLAlchemy对请求范围会话的使用,因此会话应该返回到SQLAlchemy的池并在请求结束时回滚,即使是错误的(尽管可能不是第一个,因为在应用程序上下文中引发的内容被拆除了).为什么回滚没有发生?如果我们每次都看到stdout上的"无效事务"错误(在uwsgi的日志中),我就能理解它,但我们不是:我第一次只看到它一次.但每次500秒发生时,我都会看到"准备好的状态"错误(在我们的应用程序日志中).
我们已经关闭expire_on_commit了session_options,我们已经开启了SQLALCHEMY_COMMIT_ON_TEARDOWN.我们只是从数据库中读取,而不是写作.我们还使用Dogpile-Cache进行所有查询(使用memcached锁,因为我们有多个进程,实际上是2个负载均衡的服务器).缓存会在每分钟到期,因为我们的主要查询.
重启服务器似乎已经解决了问题,这并不奇怪.也就是说,我希望能再次看到它,直到我们弄清楚如何阻止它.benselme(下面)建议编写我们自己的拆解回调,并在提交时进行异常处理,但我觉得更大的问题是该线程在其余生中被搞砸了.事实上,在一两个请求之后这并没有消失,这让我很紧张!
我对SQLAlchemy中的过滤有点困惑.
我目前正在尝试过滤超过10周的条目,所以我有
current_time = datetime.datetime.utcnow()
potential = session.query(Subject).filter(Subject.time < current_time - datetime.timedelta(weeks=10))
Run Code Online (Sandbox Code Playgroud)
但是,potential.count()总是返回0.
我的理论是,我无法正常使用过滤语句,因为当我尝试使用列,它是类型的不是Column(DateTime()),而是
柱(字符串(250))
喜欢
potential = session.query(Subject).filter(Subject.string_field < current_time - datetime.timedelta(weeks=10))
Run Code Online (Sandbox Code Playgroud)
SQLAlchemy仍然不会抱怨.
另外,当我进行手动检查时
curr_time - session.query(Subject).first().time > datetime.timedelta(weeks=10)
Run Code Online (Sandbox Code Playgroud)
我得到的True意思是计数不应该是0.
我错过了一些明显的东西吗 任何帮助,将不胜感激.
我正在使用SQLite作为我的基于PySide的桌面应用程序的应用程序文件格式(请参阅此处了解为什么要这样做).也就是说,当用户使用我的应用程序时,他们的数据将保存在其计算机上的单个数据库文件中.我正在使用SQLAlchemy ORM与数据库进行通信.
当我发布应用程序的新版本时,我可能会修改数据库架构.我不希望用户每次更改架构时都要丢弃他们的数据,所以我需要将他们的数据库迁移到最新的格式.此外,我创建了大量临时数据库来保存数据的子集,以便与某些外部进程一起使用.我想用alembic创建这些数据库,以便用正确的版本标记它们.
我有几个问题:
有没有办法从我的Python代码中调用alembic?我认为必须使用Popen纯Python模块是很奇怪的,但是文档只是从命令行使用alembic.主要是,我需要将数据库位置更改为用户数据库所在的位置.
如果这不可能,我可以从命令行指定新的数据库位置而不编辑.ini文件吗?这将使调用alembic Popen不是一件大事.
我看到,alembic将其版本信息保存在一个名为的简单表中alembic_version,其中一列被调用version_num,一行指定了版本.我可以alembic_version在我的架构中添加一个表,并在创建新数据库时使用最新版本填充它,这样就没有开销吗?这甚至是个好主意; 我应该使用alembic来创建所有数据库吗?
我的alembic非常适合我在项目目录中使用的单个数据库.我想使用alembic在任意位置方便地迁移和创建数据库,最好是通过某种Python API,而不是命令行.此应用程序也被cx_Freeze冻结,以防有所作为.
谢谢!
我有一些相当大的pandas DataFrames,我想使用新的批量SQL映射通过SQL Alchemy将它们上传到Microsoft SQL Server.pandas.to_sql方法虽然不错,但速度很慢.
我在编写代码时遇到了麻烦......
我希望能够将这个函数传递给我正在调用的pandas DataFrame,我正在调用table的模式名称schema,以及我正在调用的表名name.理想情况下,该函数将1.)删除表,如果它已经存在.2.)创建一个新表3.)创建一个mapper和4.)使用mapper和pandas数据批量插入.我被困在第3部分.
这是我的(诚然粗糙的)代码.我正在努力解决如何让mapper函数与我的主键一起工作.我真的不需要主键,但映射器功能需要它.
感谢您的见解.
from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase
def bulk_upload(table, schema, name):
e = create_engine('mssql+pyodbc://MYDB')
s = create_session(bind=e)
m = MetaData(bind=e,reflect=True,schema=schema)
Base = declarative_base(bind=e,metadata=m)
t = Table(name,m)
m.remove(t)
t.drop(checkfirst=True)
sqld = SQLDatabase(e, schema=schema,meta=m)
sqlt = SQLTable(name, sqld, table).table
sqlt.metadata = m
m.create_all(bind=e,tables=[sqlt])
class MyClass(Base):
return
mapper(MyClass, sqlt)
s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
return
Run Code Online (Sandbox Code Playgroud)