使用SQLAlchemy ORM批量插入

Nic*_*den 104 python mysql database orm sqlalchemy

有没有办法让SQLAlchemy进行批量插入而不是插入每个单独的对象.即

这样做的:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
Run Code Online (Sandbox Code Playgroud)

而不是:

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)
Run Code Online (Sandbox Code Playgroud)

我刚刚转换了一些代码来使用sqlalchemy而不是原始的sql,虽然它现在更好用,但它现在看起来更慢(高达10倍),我想知道这是否是原因.

也许我可以更有效地使用会话来改善这种情况.在我添加了一些东西之后,我autoCommit=False现在做了一件事session.commit().虽然这似乎导致数据在其他地方更改数据库时变得陈旧,即使我做了一个新的查询,我仍然会得到旧的结果?

谢谢你的帮助!

Pie*_*rre 136

SQLAlchemy在版本中介绍了1.0.0:

批量操作 - SQLAlchemy docs

通过这些操作,您现在可以进行批量插入或更新!

例如,你可以这样做:

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()
Run Code Online (Sandbox Code Playgroud)

在这里,将制造散装插入物.

  • 你还需要s.commit()来实际保存记录(我花了一些时间来解决这个问题). (23认同)
  • 我用sqlachemy 1.0.11尝试了这个,它仍然生成3个插入语句.但它比普通的orm操作快得多. (2认同)
  • 虽然与OP问题无关,但值得一提的是这确实打破了ORM的某些特征.http://docs.sqlalchemy.org/en/rel_1_0/orm/persistence_techniques.html#orm-compatibility (2认同)

dha*_*fey 30

据我所知,没有办法让ORM发出批量插入.我认为潜在的原因是SQLAlchemy需要跟踪每个对象的身份(即新的主键),而批量插入会干扰它.例如,假设您的foo表包含一id列并映射到一个Foo类:

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1
Run Code Online (Sandbox Code Playgroud)

由于SQLAlchemy在x.id不发出另一个查询的情况下获取了值,因此我们可以推断出它直接从INSERT语句中获取了值.如果您不需要通过相同的实例随后访问创建的对象,则可以跳过插入的ORM层:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))
Run Code Online (Sandbox Code Playgroud)

SQLAlchemy无法将这些新行与任何现有对象匹配,因此您必须重新查询它们以进行任何后续操作.

就陈旧数据而言,记住会话没有内置方式来了解数据库何时在会话之外进行更改是有帮助的.为了通过现有实例访问外部修改的数据,必须将实例标记为已过期.默认情况下会发生这种情况session.commit(),但可以通过调用session.expire_all()或手动完成session.expire(instance).一个例子(SQL省略):

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42
Run Code Online (Sandbox Code Playgroud)

session.commit()expires x,因此第一个print语句隐式打开一个新事务并重新查询其x属性.如果您注释掉第一个print语句,您会注意到第二个语句现在选择了正确的值,因为直到更新之后才会发出新查询.

从事务隔离的角度来看,这是有道理的 - 您应该只在事务之间进行外部修改.如果这给您带来麻烦,我建议澄清或重新考虑您的应用程序的交易边界,而不是立即达成session.expire_all().

  • 替代风格:`session.execute(Foo .__ table __.insert(),values)` (10认同)
  • 请注意,较新版本的sqlalchemy具有批量插入功能:http://docs.sqlalchemy.org/en/latest/orm/persistence_techniques.html#bulk-operations (6认同)

Gra*_*ies 25

sqlalchemy文档对可用于批量插入的各种技术的性能进行了很好的描述:

ORM基本上不适用于高性能批量插入 - 这是SQLAlchemy除了将ORM作为一流组件之外还提供Core的全部原因.

对于快速批量插入的用例,ORM构建的SQL生成和执行系统是Core的一部分.直接使用这个系统,我们可以生成一个直接使用原始数据库API的INSERT.

或者,SQLAlchemy ORM提供Bulk Operations方法套件,这些方法为工作单元的子部分提供钩子,以便通过一定程度的基于ORM的自动化发出Core级别的INSERT和UPDATE结构.

下面的示例说明了几种不同的插入行的方法的基于时间的测试,从最自动化到最不自动化.使用cPython 2.7,观察到运行时:

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec
Run Code Online (Sandbox Code Playgroud)

脚本:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)
Run Code Online (Sandbox Code Playgroud)


reu*_*ano 14

我通常使用它add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
Run Code Online (Sandbox Code Playgroud)

  • 我不认为这太违反直觉了 - 事实上它确实是_add_ _all_你要求的东西.没有任何关于将所有内容添加到会话似乎意味着它将暗示发布什么底层SQL语句.看一下来源:https://github.com/zzzeek/sqlalchemy/blob/ea36338b2e5f621287e9890deeffb6c5f6ff0874/lib/sqlalchemy/orm/session.py#L1784事实上它确实看起来只是`.add`每个项目. (3认同)
  • 你确定这个有效吗?它不只是一次只对`session添加一个会话? (2认同)
  • @reubano `add_all` 发出单独的 `INSERT INTO` 。这与问题所要求的相反。你不必相信我,只要检查查询日志即可。 (2认同)

use*_*082 12

从版本0.8开始,SQLAlchemy添加了直接支持

根据文档,connection.execute(table.insert().values(data))应该做的伎俩.(请注意,这是一样的connection.execute(table.insert(), data),其通过将呼叫导致许多单个行插入executemany).除了本地连接之外,性能上的差异可能是巨大的.


jua*_*gan 7

SQLAlchemy在版本中介绍了1.0.0:

批量操作 - SQLAlchemy docs

通过这些操作,您现在可以进行批量插入或更新!

例如(如果您希望简单表INSERT的开销最低),您可以使用Session.bulk_insert_mappings():

loadme = [
        (1, 'a')
    ,   (2, 'b')
    ,   (3, 'c')
    ]

dicts = []
for i in range(len(loadme)):
    dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1]))

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()
Run Code Online (Sandbox Code Playgroud)

或者,如果您愿意,可以跳过loadme元组并直接编写字典dicts(但我发现更容易将所有冗余的数据从数据中删除并在循环中加载字典列表).


Mat*_*sen 7

Piere的答案是正确的,但有一个问题是bulk_save_objects默认情况下不返回对象的主键,如果你担心的话.设置return_defaultsTrue获取此行为.

文档在这里.

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()
Run Code Online (Sandbox Code Playgroud)

  • 必须谨慎对待旗帜。它将一次按顺序插入一个对象,并且可能不会获得显着的性能增益 [1]。就我而言,性能下降了,我怀疑这是由于开销造成的。[1]:https://docs.sqlalchemy.org/en/13/orm/session_api.html#sqlalchemy.orm.session.Session.bulk_save_objects.params.return_defaults (2认同)

Eef*_*ret 5

这是一种方法:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])
Run Code Online (Sandbox Code Playgroud)

这样插入:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
Run Code Online (Sandbox Code Playgroud)

参考:SQLAlchemy FAQ包含各种提交方法的基准。


chj*_*und 5

条条大路通罗马,但其中一些横穿山脉,需要渡轮,但如果您想快速到达那儿,请乘高速公路。


在这种情况下,高速公路将使用psycopg2execute_batch()功能。该文档说的最好:

当前的实现executemany()(使用非常慈善的轻描淡写)不是特别有效。这些功能可用于加快针对一组参数的语句的重复执行。通过减少服务器往返次数,性能可以比使用更好executemany()

在我自己的测试execute_batch()快2倍左右executemany(),并给出配置进行进一步的调整所以page_size的选项(如果你想挤进业绩的最后2-3%的驾驶者)。

如果使用SQLAlchemy,则可以通过use_batch_mode=True在实例化引擎时将其设置为参数来轻松启用相同功能。create_engine()