如何在插入时使用 SQLAlchemy 解析相关外键?

jla*_*rcy 8 python orm sqlalchemy foreign-keys pandas

动机

我有来自与 Pandas DataFrame 接口的来源的数据。我有一个由 SQLAlchemy ORM 接口的数据模型。为了 MCVE,我已将数据模型规范化为两个表:

  • channel 保存有关记录的元数据(小容量,~1k 行);
  • record保持记录指向channel(更高的容量,90k 行/天)。

目的channel是避免重复。我想要的是record使用带有数据源不知道的约束的 SQLAlchemy 将数据插入到表中channelid

数据源

这是来自源的数据示例(我可以访问的唯一数据):

import pandas as pd
recs = [
    {'serial': '1618741320', 'source': 1, 'channel': 4, 'timestamp': pd.Timestamp('2019-01-01 08:35:00'), 'value': 12},
    {'serial': '1350397285', 'source': 2, 'channel': 3, 'timestamp': pd.Timestamp('2019-01-01 09:20:00'), 'value': 37},
    {'serial': '814387724', 'source': 2, 'channel': 1, 'timestamp': pd.Timestamp('2019-01-01 12:30:00'), 'value': 581},
    {'serial': '545914014', 'source': 3, 'channel': 0, 'timestamp': pd.Timestamp('2019-01-01 01:45:00'), 'value': 0},
    {'serial': '814387724', 'source': 0, 'channel': 5, 'timestamp': pd.Timestamp('2019-01-01 14:20:00'), 'value': 699}
]
data = pd.DataFrame(recs)
Run Code Online (Sandbox Code Playgroud)

这是channel从设置中学习到的存储在其中的元数据示例。

recs = [
    {'channelid': 28, 'serial': '545914014', 'source': 3, 'channel': 0},
    {'channelid': 73, 'serial': '1350397285', 'source': 2, 'channel': 3},
    {'channelid': 239, 'serial': '1618741320', 'source': 1, 'channel': 4},
    {'channelid': 245, 'serial': '814387724', 'source': 0, 'channel': 5},
    {'channelid': 259, 'serial': '814387724', 'source': 2, 'channel': 1}
]
meta= pd.DataFrame(recs)
Run Code Online (Sandbox Code Playgroud)

MCVE

首先让我们从 MCVE 开始!

我们定义数据模型:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, Float, String, DateTime
from sqlalchemy import UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship

Base = declarative_base()
Engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

class Channel(Base):
    __tablename__ = 'channel'
    __table_args__ = (UniqueConstraint('serial', 'source', 'channel'),)
    id = Column(Integer, primary_key=True)
    serial = Column(String, nullable=False)
    source = Column(Integer, nullable=False)
    channel = Column(Integer, nullable=False)

class Record(Base):
    __tablename__ = 'record'
    __table_args__ = (UniqueConstraint('channelid', 'timestamp'),)
    id = Column(Integer, primary_key=True)
    channelid = Column(Integer, ForeignKey('channel.id'), nullable=False)
    timestamp = Column(DateTime, nullable=False)
    value = Column(Float, nullable=False)
    channel = relationship("Channel")

Base.metadata.drop_all(Engine)
Base.metadata.create_all(Engine)
Run Code Online (Sandbox Code Playgroud)

我们提供channel表格以反映我们已经拥有的元数据:

with Engine.connect() as dbcon:
    dbcon.execute(Channel.__table__.insert(), meta.to_dict(orient='records'))
Run Code Online (Sandbox Code Playgroud)

要解决的问题

现在我们想轻松地插入datarecord表中,但不幸的是我们缺少channelid来自我们的数据源(它不知道它)。显然这个调用失败了:

with Engine.connect() as dbcon:
    with dbcon.begin() as dbtrans:
        dbcon.execute(Record.__table__.insert(), data.to_dict(orient='records'))
        dbtrans.commit()
Run Code Online (Sandbox Code Playgroud)

因为:

IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "channelid" violates not-null constraint
DETAIL:  Failing row contains (6, null, 2019-01-01 08:35:00, 12).
 [SQL: 'INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)'] [parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
Run Code Online (Sandbox Code Playgroud)

我们可以用熊猫来处理它:

meta = pd.read_sql("SELECT id AS channelid, serial, source, channel FROM channel;", Engine.connect())
full = data.merge(meta, on=['serial', 'source', 'channel'])
Run Code Online (Sandbox Code Playgroud)

之前的调用将起作用,因为关联channelid已完成:

   channel      serial  source           timestamp  value  channelid
0        4  1618741320       1 2019-01-01 08:35:00     12        239
1        3  1350397285       2 2019-01-01 09:20:00     37         73
2        1   814387724       2 2019-01-01 12:30:00    581        259
3        0   545914014       3 2019-01-01 01:45:00      0         28
4        5   814387724       0 2019-01-01 14:20:00    699        245
Run Code Online (Sandbox Code Playgroud)

但这不是我认为应该解决的方式,主要是因为它使我执行与 Pandas 而不是 SQLAlchemy 的绑定。

我也试过这个,但对于 90k 记录的数据集来说它是完全低效的:

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=Engine)
session = Session()
with session.begin_nested() as trans:
    for rec in data.to_dict(orient='records'):
        c = session.query(Channel).filter_by(**{k: rec.pop(k) for k in ['serial', 'source', 'channel']}).first()
        r = Record(channelid=c.id, **rec)
        session.add(r)
Run Code Online (Sandbox Code Playgroud)

使用 DataFrame 的时间比以前的方法长近 100 倍。

我将精力集中在构建一个全面的 MCVE 上,因为我在 Pandas 方面比 SQLAlchemy 更流利,而且我在 SQLAlchemy 文档中找不到解决我的问题的方法。

我的问题是:“我如何才能channelid以一种高性能且依赖 SQLAclhemy 而不是 Pandas 的方式解决让我的插入成功?”

请随时发表评论以改进这篇文章。我正在寻找的是一种合理的方式来做到这一点。它可以暗示更新数据模型,我有这种灵活性。

更新

阅读有关 SQLAlchemy 和测试建议的更多信息@Ramasubramanian S,我能做到的最好的是:

ukeys = ['serial', 'source', 'channel']
with session.begin_nested() as trans:
    g = data.groupby(ukeys)
    for key in g.groups:
        recs = []
        for rec in data.loc[g.groups[key],:].to_dict(orient='records'):
            m = {k: rec.pop(k) for k in ukeys}
            c = session.query(Channel).filter_by(**m).first()
            #r = Record(channel=c, **rec)  
            r = Record(channelid=c.id, **rec) # Bulk Insert needs explicit id not a relationship
            recs.append(r)
        #session.add_all(recs)
        session.bulk_save_objects(recs) # Not working w/ relationship
Run Code Online (Sandbox Code Playgroud)

使用Record(channel=c, **rec)该方法session.bulk_save_objects提出的关系:

IntegrityError: (psycopg2.IntegrityError) ERREUR:  une valeur NULL viole la contrainte NOT NULL de la colonne « channelid »
DETAIL:  La ligne en échec contient (1, null, 2019-01-01 08:35:00, 12)

[SQL: INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)]
[parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
(Background on this error at: http://sqlalche.me/e/gkpj)
Run Code Online (Sandbox Code Playgroud)

thenchannelid设置为NULL,好像不能使用relationshipcapability,所以需要显式的passchannelid才能生效。

小智 5

提高插入多条记录性能的一种方法是使用bulk_save_objectsbulk_insert_mappings创建对象并批量插入数据库。

此链接显示了插入多条记录的各种方法的性能比较。

你可以在这里找到类似的答案

干杯