我有一个函数,它接受page_size和skip作为参数.如果page_size = 10和skip = 2,我想从第21行开始选择10行.我认为这与LIMIT和OFFSET.我如何在SQLAlchemy中执行此操作?
我在SqlAlchemy中Table调用journals了一个具有以下列之一:
Column('type', String(128))
Run Code Online (Sandbox Code Playgroud)
然后我将mapper它映射journals到Journalspython类:
mapper(Journals, journals);
Run Code Online (Sandbox Code Playgroud)
我想插入一个新行journals并设置type为NULL.
什么是相应的Python类型为NULL?
这是一个例子:
self.type = NULL;
Run Code Online (Sandbox Code Playgroud)
NULLPython中没有类型,所以我不知道如何做.
我正在使用带有WTForms和SQLAlchemy的Flask.我目前有这个设置:
一个SQLAlchemy类:
class User(Base):
__tablename__ = 'user'
name = db.Column(db.String)
last_name = db.Column(db.String)
__init__(name, last_name):
self.name = name
self.last_name = last_name
Run Code Online (Sandbox Code Playgroud)
相应的表格:
class CreateUserForm(Form):
name = StringField('Name')
last_name = StringField('Last name')
Run Code Online (Sandbox Code Playgroud)
路线:
@user.route('/', methods=['POST'])
def create():
form = CreateUserForm(request.form)
if form.validate():
user = User(form.name.data, form.last_name.data)
...
Run Code Online (Sandbox Code Playgroud)
这只是一个简化的例子,但我想知道的是,如果我能以某种方式将表单变量传递给User构造函数并一直传递给SQLAlchemy中的User类?由于构造函数中的表单与用户数据库表中的字段相同,因此它会很好.
我希望我的路线看起来像这样:
@user.route('/', methods=['POST'])
def create():
form = CreateUserForm(request.form)
if form.validate():
user = User(form)
...
Run Code Online (Sandbox Code Playgroud)
所以我不必在每个部分中处理form.name和form.last_name.
目标:创建一个SQLAlchemy属性,该属性跟踪/跟踪另一个对象的SQLAlchemy属性中的更改.
鉴于:
class ClazzA():
attributeA = Column(JSONDict)
class ClazzB():
attributeB = Column(?)
objectA = ClazzA()
objectA.attributeA = {'foo': 1}
objectB = ClazzB()
objectB.attributeB = objectA.attributeA
objectA.attributeA['foo'] = 2
Run Code Online (Sandbox Code Playgroud)
JSONDict与MutableDict此处描述的相关联:http://docs.sqlalchemy.org/en/latest/orm/extensions/mutable.html#module-sqlalchemy.ext.mutable,即该JSONDict类型允许进行突变跟踪.
所以我们在objectA上有这个字典,其变化由SQLAlchemy记录.我希望attributeB跟踪attributeA,这样即使重新启动应用程序(即从DB重新加载属性),attributeB也将继续反映对attributeA字典所做的更改.
当然,这与Python 不知道指针的事实密切相关.我想知道SQLAlchemy是否有针对这个特定问题的解决方案.
下面的模型加上错误信息.
我试图使用Alembic创建一些数组列,但获取NameError:name'String'未定义.
任何有价值的帮助
谢谢!
from sqlalchemy import Column, String, Integer, DateTime
from serve_spec.db_global import db
import datetime
from time import time
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.dialects.postgresql import ARRAY
class Issues(db.Base):
__tablename__ = 'issues'
id = Column(String, primary_key=True)
thread_id = Column(String, nullable=False)
created = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.utcnow)
created_timestamp = Column(Integer, nullable=False, default=time)
created_by_user_name = Column(String, nullable=False)
is_parent = Column(Integer, nullable=False)
parent_title = Column(String)
subscribed = Column(ARRAY(String))
unsubscribed = Column(ARRAY(String))
pending_notifications_web = Column(ARRAY(String))
pending_notifications_email = Column(ARRAY(String))
markdown_text = Column(String, nullable=False, ) …Run Code Online (Sandbox Code Playgroud) 我正在尝试删除已经发生的数据库中的事件.我的脚本如下.
在**的行上,我收到一个错误
AttributeError: type object 'events' has no attribute 'query'
Run Code Online (Sandbox Code Playgroud)
有谁知道如何解决这一问题?谢谢!
Base = automap_base()
# engine, suppose it has two tables 'user' and 'address' set up
engine = create_engine("DB_URL")
# reflect the tables
Base.prepare(engine, reflect=True)
# mapped classes are now created with names by default
# matching that of the table name.
Event = Base.classes.events
User = Base.classes.users
**events = Event.query.filter(end_time < datetime.now())**
session = Session(engine)
session.delete(events)
session.commit()
Run Code Online (Sandbox Code Playgroud) 我正在尝试对相关对象进行以下过滤的不同项目的SQL Alchemy查询,其等效于以下查询:
SELECT distinct items.item_id, items.item_name
FROM items
INNER JOIN categories as cat on items.category_id = cat.category_id
INNER JOIN stores on cat.store_id = stores.store_id
WHERE store.store_id = 123
Run Code Online (Sandbox Code Playgroud)
我已经创建了以下包含外键的模型,但是当我在下面运行查询时,它不能正确过滤。
items_query = (db.session.query(Store, Item)
.filter(Store.store_id == 123)
).all()
#SQL Alchemy Models
class Store(db.Model):
__tablename__ = 'stores'
store_id = db.Column(db.Integer, primary_key=True, autoincrement=True)
store_name = db.Column(db.String(100), unique=True, nullable=False)
def __repr__(self):
return '<Store>'+str(self.store_name)
class Category(db.Model):
__tablename__ = 'categories'
category_id = db.Column(db.Integer, primary_key=True, autoincrement=True)
category_name = db.Column(db.String(100), unique=True, nullable=False)
store_id = db.Column(db.Integer, db.ForeignKey('stores.store_id'))
store = …Run Code Online (Sandbox Code Playgroud) 我有一个端点从我的数据库中删除一个对象.我用以下代码删除它:
my_object = Object.query.get(id)
db.session.delete(my_object)
db.session.commit()
return json.dumps({'success': True}
Run Code Online (Sandbox Code Playgroud)
我有一个API测试来测试我创建对象的端点,然后使用端点将其删除.我试图断言删除后发生它不在数据库中.
my_object = MyObject()
db.session.add(my_object)
db.session.commit()
response = requests.delete('{}/my-object/{}'.format(
os.environ.get('MY_URL'),
my_object.id
))
self.assertTrue(response.json()['success']) // this passes
self.assertEqual(200, response.status_code) // this passes
result = db.session.query(MyObject).get(my_object.id)
print(result.id) // prints the id of the job even though it is deleted from the database
Run Code Online (Sandbox Code Playgroud)
我认为这与一些SQLAlchemy会话缓存有关.我试过了db.session.flush(),但db.session.expire_all()没有用.该对象实际上正在从数据库中删除.所以我希望查询结果是None.
我在文档中看到了这一点,但没有完全包裹我的脑袋.何时提交和关闭会话
谢谢你的帮助.
我有一个使用Flask-SQLAlchemy连接到数据库的Flask应用程序。
我需要检查的行是否name='reza'存在...
我想我可以使用Any(),Exists()。
我正在尝试从python脚本在redshift中运行查询,但出现错误:
sqlalchemy.exc.InternalError: (psycopg2.InternalError) ALTER EXTERNAL TABLE cannot run inside a transaction block
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
engine = create_engine(SQL_ENGINE % urlquote(REDSHIFT_PASS))
partition_date = (date.today() - timedelta(day)).strftime("%Y%m%d")
query = """alter table {table_name} add partition (dt={date_partition}) location 's3://dft-dwh-files/raw_data/google_analytics/revenue_per_channel/{date_partition}/';""".format(date_partition=partition_date,table_name=table_name)
conn = engine.connect()
conn.execute(query).execution_options(autocommit=True)
Run Code Online (Sandbox Code Playgroud)
我怎样才能解决这个问题?
sqlalchemy ×10
python ×9
flask ×3
alembic ×1
distinct ×1
exists ×1
postgresql ×1
python-3.x ×1
wtforms ×1