SQLAlchemy:按多个表按天分组

ssc*_*ssc 5 python postgresql timestamp group-by sqlalchemy

在我的Flask应用程序中,我有一个类似于银行帐户的东西:一个User有一个Account,信用分录建模为Incomings,扣减模型为Outgoings

问题:

获取一个用户的“帐户对帐单”,即每天的信用额/扣除额,例如

Thu 29 Aug 2019
  Some deduction: -23.00
  Some credit: 123.00
Fri 30 Aug 2019
  Big credit: 4223.00
  Another deduction: -42.00
Run Code Online (Sandbox Code Playgroud)

我的数据模型:

这是我的models.py样子(简化版本):

Thu 29 Aug 2019
  Some deduction: -23.00
  Some credit: 123.00
Fri 30 Aug 2019
  Big credit: 4223.00
  Another deduction: -42.00
Run Code Online (Sandbox Code Playgroud)

我一般的预期方法是:

  1. Incomings为用户收集所有信息,按天分组
  2. Outgoings为用户收集所有信息,按天分组
  3. 以某种方式合并按天分组的两个列表

我的背景:

自从我使用基础数据库PostgreSQL以来已经有一段时间了(但是后来,我确实设法设置了一个触发功能来自动更新余额),但是就SQLAlchemy(正在使用的ORM)而言,我似乎只是划伤了表面。

第1步:Incomings按用户分组为用户获取所有信息

在第一个SO命中之后,我尝试了

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy     import Column, Float, ForeignKey, Integer, Text, TIMESTAMP
from sqlalchemy.orm import relationship

Base = declarative_base()

class Account(Base):
    __tablename__ = 'account'
    id        = Column(Integer, primary_key=True)
    balance   = Column(Float,   nullable=False)
    userID    = Column(Integer, ForeignKey('user.id'))
    incomings = relationship("Incoming", back_populates="account")
    outgoings = relationship("Outgoing", back_populates="account")
    user      = relationship("User",     back_populates="account")

class Incoming(Base):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

class Outgoing(Base):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

class User(Base):
    __tablename__ = 'user'
    id      = Column(Integer,   primary_key=True)
    name    = Column(Text,      nullable=False)
    account = relationship("Account", back_populates="user")
Run Code Online (Sandbox Code Playgroud)

但这失败了

ProgrammingError: (psycopg2.errors.UndefinedFunction) ...
 ... function day(timestamp without time zone) does not exist
Run Code Online (Sandbox Code Playgroud)

这似乎表明PostgreSQL不支持day

根据这个答案

from sqlalchemy import func

# existing sample account ID
accountID  = 42
# not relevant to the point at hand, known to work
db_session = get_a_scoped_session_from_elsewhere()

db_incomings = db_session.query(Incoming)                         \
                         .filter(Incoming.accountID == accountID) \
                         .group_by(func.day(Incoming.timestamp))  \
                         .all()
Run Code Online (Sandbox Code Playgroud)

适用于PostgreSQL,但对我而言失败

ProgrammingError: (psycopg2.errors.GroupingError) ...
 ... column "incoming.id" must appear in the GROUP BY clause ...
 ... or be used in an aggregate function
Run Code Online (Sandbox Code Playgroud)

当我只是盲目尝试执行错误消息告诉我的内容并添加incoming.idGROUP BY子句中时,如

db_incomings = db_session.query(Incoming)                                      \
                         .filter(Incoming.accountID == accountID)              \
                         .group_by(Incoming.id,
                                   func.date_trunc('day', Incoming.timestamp)) \
                         .all()
Run Code Online (Sandbox Code Playgroud)

该代码有效,但未返回所需结果;相反,我得到一个对象列表,例如

{'timestamp': datetime.datetime(2019, 8, 29, 10, 4, 27, 459000), 'id': 1, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 21, 493000), 'id': 2, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 42, 660000), 'id': 3, 'accountID': 42, ...}
Run Code Online (Sandbox Code Playgroud)

考虑到我按分组,这并不奇怪Incoming.id

为了理解潜在的问题(请参见例如herehere),似乎我无法在SELECT语句(即SQLAlchemy .query上引用字段,如果该字段未出现在GROUP BY子句(即SQLAlchemy .group_by)上。查看错误消息,似乎也是如此,反之亦然。

我已经动脑筋了几个小时,找到了很多替代方法,func.date_trunc并打开了800个浏览器选项卡,但是仍然不知道如何解决这个问题。

我的问题:如何构造/建立SQLAlchemy查询?

Ilj*_*ilä 5

SQL 处理并返回表格数据(或关系,如果您愿意这样想,但并非所有 SQL 表都是关系)。这意味着问题中描述的嵌套表并不是那么常见的功能。有一些方法可以在 Postgresql 中生成类似的东西,例如使用 JSON 或组合数组,但完全有可能只获取表格数据并在应用程序中执行嵌套。itertools.groupby()给定排序的数据,Python 有,这非常符合要求。

错误column "incoming.id" must appear in the GROUP BY clause...是说选择列表中的非聚合、具有子句等必须出现在GROUP BY子句中或在聚合中使用,以免它们可能具有不确定的值。换句话说,必须从组中的某一行中选取该值,因为GROUP BY将分组的行压缩为一行,并且任何人都可以猜测它们是从哪一行中选取的。实现可能允许这样做,就像 SQLite 和 MySQL 过去所做的那样,但 SQL 标准禁止这样做。规则的例外是当存在函数依赖时;该GROUP BY子句确定非聚合。考虑按A分组的表AB之间的连接的主键。无论系统从组中的哪一行选择A的列的值,它们都是相同的,因为分组是基于主键完成的。

为了解决 3 点一般预期方法,一种方法是选择传入和传出的联合,按时间戳排序。由于没有继承层次结构设置——因为甚至可能没有,我不熟悉会计——在这种情况下,恢复使用核心和普通结果元组会使事情变得更容易:

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)
Run Code Online (Sandbox Code Playgroud)

然后为了形成嵌套结构itertools.groupby()使用:

date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Run Code Online (Sandbox Code Playgroud)

最终结果是按升序排列的日期 2 元组列表和条目字典列表。不完全是 ORM 解决方案,但可以完成工作。一个例子:

In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [57]: session.commit()

In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    ...:     where(Incoming.accountID == 1)
    ...: 
    ...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    ...:     where(Outgoing.accountID == 1)
    ...: 
    ...: all_entries = incoming.union(outgoing)
    ...: all_entries = all_entries.order_by(all_entries.c.timestamp)
    ...: all_entries = db_session.execute(all_entries)

In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
    ...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]: 
[(datetime.date(2019, 9, 1),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 5,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 2),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 3),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 2,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
    'type': 'outgoing'}])]
Run Code Online (Sandbox Code Playgroud)

如前所述,Postgresql 可以产生与使用 JSON 数组几乎相同的结果:

from sqlalchemy.dialects.postgresql import aggregate_order_by

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing).alias('all_entries')

day = func.date_trunc('day', all_entries.c.timestamp)

stmt = select([day,
               func.array_agg(aggregate_order_by(
                   func.row_to_json(literal_column('all_entries.*')),
                   all_entries.c.timestamp))]).\
    group_by(day).\
    order_by(day)

db_session.execute(stmt).fetchall()
Run Code Online (Sandbox Code Playgroud)

例如,如果实际上Incoming并且Outgoing可以被认为是公共基础的子项Entry,则使用联合可以通过具体的表继承在某种程度上自动化:

from sqlalchemy.ext.declarative import AbstractConcreteBase

class Entry(AbstractConcreteBase, Base):
    pass

class Incoming(Entry):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

    __mapper_args__ = {
        'polymorphic_identity': 'incoming',
        'concrete': True
    }

class Outgoing(Entry):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

    __mapper_args__ = {
        'polymorphic_identity': 'outgoing',
        'concrete': True
    }
Run Code Online (Sandbox Code Playgroud)

不幸的是在定义了所有必要的类时,AbstractConcreteBase需要手动调用configure_mappers();在这种情况下,最早的可能性是在定义之后User,因为Account通过关系依赖于它:

from sqlalchemy.orm import configure_mappers
configure_mappers()
Run Code Online (Sandbox Code Playgroud)

然后为了获取所有IncomingOutgoing在单个多态 ORM 查询中使用Entry

session.query(Entry).\
    filter(Entry.accountID == accountID).\
    order_by(Entry.timestamp).\
    all()
Run Code Online (Sandbox Code Playgroud)

并继续使用itertools.groupby()如上所得的列表IncomingOutgoing


Ps 小心二进制浮点数和货币。我们曾经玩得很开心,弄清楚为什么购买 40.80 最终会变成 40.79。