ssc*_*ssc 5 python postgresql timestamp group-by sqlalchemy
在我的Flask应用程序中,我有一个类似于银行帐户的东西:一个User有一个Account,信用分录建模为Incomings,扣减模型为Outgoings。
问题:
获取一个用户的“帐户对帐单”,即每天的信用额/扣除额,例如
Thu 29 Aug 2019
Some deduction: -23.00
Some credit: 123.00
Fri 30 Aug 2019
Big credit: 4223.00
Another deduction: -42.00
Run Code Online (Sandbox Code Playgroud)
我的数据模型:
这是我的models.py样子(简化版本):
Thu 29 Aug 2019
Some deduction: -23.00
Some credit: 123.00
Fri 30 Aug 2019
Big credit: 4223.00
Another deduction: -42.00
Run Code Online (Sandbox Code Playgroud)
我一般的预期方法是:
Incomings为用户收集所有信息,按天分组Outgoings为用户收集所有信息,按天分组我的背景:
自从我使用基础数据库PostgreSQL以来已经有一段时间了(但是后来,我确实设法设置了一个触发功能来自动更新余额),但是就SQLAlchemy(正在使用的ORM)而言,我似乎只是划伤了表面。
Incomings按用户分组为用户获取所有信息在第一个SO命中之后,我尝试了
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Float, ForeignKey, Integer, Text, TIMESTAMP
from sqlalchemy.orm import relationship
Base = declarative_base()
class Account(Base):
__tablename__ = 'account'
id = Column(Integer, primary_key=True)
balance = Column(Float, nullable=False)
userID = Column(Integer, ForeignKey('user.id'))
incomings = relationship("Incoming", back_populates="account")
outgoings = relationship("Outgoing", back_populates="account")
user = relationship("User", back_populates="account")
class Incoming(Base):
__tablename__ = 'incoming'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="incomings")
class Outgoing(Base):
__tablename__ = 'outgoing'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="outgoings")
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(Text, nullable=False)
account = relationship("Account", back_populates="user")
Run Code Online (Sandbox Code Playgroud)
但这失败了
ProgrammingError: (psycopg2.errors.UndefinedFunction) ...
... function day(timestamp without time zone) does not exist
Run Code Online (Sandbox Code Playgroud)
这似乎表明PostgreSQL不支持day。
根据这个答案
from sqlalchemy import func
# existing sample account ID
accountID = 42
# not relevant to the point at hand, known to work
db_session = get_a_scoped_session_from_elsewhere()
db_incomings = db_session.query(Incoming) \
.filter(Incoming.accountID == accountID) \
.group_by(func.day(Incoming.timestamp)) \
.all()
Run Code Online (Sandbox Code Playgroud)
适用于PostgreSQL,但对我而言失败
ProgrammingError: (psycopg2.errors.GroupingError) ...
... column "incoming.id" must appear in the GROUP BY clause ...
... or be used in an aggregate function
Run Code Online (Sandbox Code Playgroud)
当我只是盲目尝试执行错误消息告诉我的内容并添加incoming.id到GROUP BY子句中时,如
db_incomings = db_session.query(Incoming) \
.filter(Incoming.accountID == accountID) \
.group_by(Incoming.id,
func.date_trunc('day', Incoming.timestamp)) \
.all()
Run Code Online (Sandbox Code Playgroud)
该代码有效,但未返回所需结果;相反,我得到一个对象列表,例如
{'timestamp': datetime.datetime(2019, 8, 29, 10, 4, 27, 459000), 'id': 1, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 21, 493000), 'id': 2, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 42, 660000), 'id': 3, 'accountID': 42, ...}
Run Code Online (Sandbox Code Playgroud)
考虑到我按分组,这并不奇怪Incoming.id。
为了理解潜在的问题(请参见例如here或here),似乎我无法在SELECT语句(即SQLAlchemy .query)上引用字段,如果该字段未出现在GROUP BY子句(即SQLAlchemy .group_by)上。查看错误消息,似乎也是如此,反之亦然。
我已经动脑筋了几个小时,找到了很多替代方法,func.date_trunc并打开了800个浏览器选项卡,但是仍然不知道如何解决这个问题。
我的问题:如何构造/建立SQLAlchemy查询?
SQL 处理并返回表格数据(或关系,如果您愿意这样想,但并非所有 SQL 表都是关系)。这意味着问题中描述的嵌套表并不是那么常见的功能。有一些方法可以在 Postgresql 中生成类似的东西,例如使用 JSON 或组合数组,但完全有可能只获取表格数据并在应用程序中执行嵌套。itertools.groupby()给定排序的数据,Python 有,这非常符合要求。
错误column "incoming.id" must appear in the GROUP BY clause...是说选择列表中的非聚合、具有子句等必须出现在GROUP BY子句中或在聚合中使用,以免它们可能具有不确定的值。换句话说,必须从组中的某一行中选取该值,因为GROUP BY将分组的行压缩为一行,并且任何人都可以猜测它们是从哪一行中选取的。实现可能允许这样做,就像 SQLite 和 MySQL 过去所做的那样,但 SQL 标准禁止这样做。规则的例外是当存在函数依赖时;该GROUP BY子句确定非聚合。考虑按A分组的表A和B之间的连接的主键。无论系统从组中的哪一行选择A的列的值,它们都是相同的,因为分组是基于主键完成的。
为了解决 3 点一般预期方法,一种方法是选择传入和传出的联合,按时间戳排序。由于没有继承层次结构设置——因为甚至可能没有,我不熟悉会计——在这种情况下,恢复使用核心和普通结果元组会使事情变得更容易:
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)
Run Code Online (Sandbox Code Playgroud)
然后为了形成嵌套结构itertools.groupby()使用:
date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Run Code Online (Sandbox Code Playgroud)
最终结果是按升序排列的日期 2 元组列表和条目字典列表。不完全是 ORM 解决方案,但可以完成工作。一个例子:
In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
...: timestamp=datetime.utcnow() - timedelta(days=i))
...: for i in range(3)])
...:
In [57]: session.commit()
In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
...: where(Incoming.accountID == 1)
...:
...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
...: where(Outgoing.accountID == 1)
...:
...: all_entries = incoming.union(outgoing)
...: all_entries = all_entries.order_by(all_entries.c.timestamp)
...: all_entries = db_session.execute(all_entries)
In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]:
[(datetime.date(2019, 9, 1),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 5,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 2),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 4,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
'type': 'outgoing'}]),
(datetime.date(2019, 9, 3),
[{'accountID': 1,
'amount': 1.0,
'description': 'incoming',
'id': 3,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
'type': 'incoming'},
{'accountID': 1,
'amount': 2.0,
'description': 'outgoing',
'id': 2,
'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
'type': 'outgoing'}])]
Run Code Online (Sandbox Code Playgroud)
如前所述,Postgresql 可以产生与使用 JSON 数组几乎相同的结果:
from sqlalchemy.dialects.postgresql import aggregate_order_by
incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
where(Incoming.accountID == accountID)
outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
where(Outgoing.accountID == accountID)
all_entries = incoming.union(outgoing).alias('all_entries')
day = func.date_trunc('day', all_entries.c.timestamp)
stmt = select([day,
func.array_agg(aggregate_order_by(
func.row_to_json(literal_column('all_entries.*')),
all_entries.c.timestamp))]).\
group_by(day).\
order_by(day)
db_session.execute(stmt).fetchall()
Run Code Online (Sandbox Code Playgroud)
例如,如果实际上Incoming并且Outgoing可以被认为是公共基础的子项Entry,则使用联合可以通过具体的表继承在某种程度上自动化:
from sqlalchemy.ext.declarative import AbstractConcreteBase
class Entry(AbstractConcreteBase, Base):
pass
class Incoming(Entry):
__tablename__ = 'incoming'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="incomings")
__mapper_args__ = {
'polymorphic_identity': 'incoming',
'concrete': True
}
class Outgoing(Entry):
__tablename__ = 'outgoing'
id = Column(Integer, primary_key=True)
accountID = Column(Integer, ForeignKey('account.id'))
amount = Column(Float, nullable=False)
description = Column(Text, nullable=False)
timestamp = Column(TIMESTAMP, nullable=False)
account = relationship("Account", back_populates="outgoings")
__mapper_args__ = {
'polymorphic_identity': 'outgoing',
'concrete': True
}
Run Code Online (Sandbox Code Playgroud)
不幸的是,在定义了所有必要的类时,AbstractConcreteBase需要手动调用configure_mappers();在这种情况下,最早的可能性是在定义之后User,因为Account通过关系依赖于它:
from sqlalchemy.orm import configure_mappers
configure_mappers()
Run Code Online (Sandbox Code Playgroud)
然后为了获取所有Incoming并Outgoing在单个多态 ORM 查询中使用Entry:
session.query(Entry).\
filter(Entry.accountID == accountID).\
order_by(Entry.timestamp).\
all()
Run Code Online (Sandbox Code Playgroud)
并继续使用itertools.groupby()如上所得的列表Incoming和Outgoing。
Ps 小心二进制浮点数和货币。我们曾经玩得很开心,弄清楚为什么购买 40.80 最终会变成 40.79。