SQLAlchemy - 将子查询的模式和数据复制到另一个数据库

San*_*eep 19 python sql sqlalchemy

我试图将子查询中的数据从postgres(from_engine)复制到sqlite数据库.我可以使用以下命令复制表来实现此目的:

smeta = MetaData(bind=from_engine)
table = Table(table_name, smeta, autoload=True)
table.metadata.create_all(to_engine)
Run Code Online (Sandbox Code Playgroud)

但是,我不确定如何为子查询语句实现相同的功能.

-Sandeep

编辑:跟进答案.创建表后,我想创建一个子查询stmt,如下所示:

table = Table("newtable", dest_metadata, *columns)
stmt = dest_session.query(table).subquery();
Run Code Online (Sandbox Code Playgroud)

但是,最后一个stmt以错误cursor.execute(语句,参数)结束sqlalchemy.exc.ProgrammingError :( ProgrammingError)关系"newtable"不存在LINE 3:FROM newtable)AS anon_1

Ste*_*fan 25

一种方法至少在某些情况下起作用:

  1. 使用column_descriptions查询对象获取有关结果集中列的一些信息.

  2. 使用该信息,您可以构建模式以在另一个数据库中创建新表.

  3. 在源数据库中运行查询并将结果插入新表中.

第一个示例的设置:

from sqlalchemy import create_engine, MetaData, 
from sqlalchemy import Column, Integer, String, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Engine to the database to query the data from
# (postgresql)
source_engine = create_engine('sqlite:///:memory:', echo=True)
SourceSession = sessionmaker(source_engine)

# Engine to the database to store the results in
# (sqlite)
dest_engine = create_engine('sqlite:///:memory:', echo=True)
DestSession = sessionmaker(dest_engine)

# Create some toy table and fills it with some data
Base = declarative_base()
class Pet(Base):
    __tablename__ = 'pets'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    race = Column(String)

Base.metadata.create_all(source_engine)
sourceSession = SourceSession()
sourceSession.add(Pet(name="Fido", race="cat"))
sourceSession.add(Pet(name="Ceasar", race="cat"))
sourceSession.add(Pet(name="Rex", race="dog"))
sourceSession.commit()
Run Code Online (Sandbox Code Playgroud)

现在到有趣的位:

# This is the query we want to persist in a new table:
query= sourceSession.query(Pet.name, Pet.race).filter_by(race='cat')

# Build the schema for the new table
# based on the columns that will be returned 
# by the query:
metadata = MetaData(bind=dest_engine)
columns = [Column(desc['name'], desc['type']) for desc in query.column_descriptions]
column_names = [desc['name'] for desc in query.column_descriptions]
table = Table("newtable", metadata, *columns)

# Create the new table in the destination database
table.create(dest_engine)

# Finally execute the query
destSession = DestSession()
for row in query:
    destSession.execute(table.insert(row))
destSession.commit()
Run Code Online (Sandbox Code Playgroud)

应该有更有效的方法来完成最后一个循环.但批量插入是另一个主题.

  • 批量添加并不复杂:destSession.add_all(list_of_rows) (2认同)
  • 对批量插入的评论:为了运行 destSession.add_all(list_of_rows),必须通过运行 [sqlalchemy.orm.session.make_transient(row) for row in list_of_rows] 将每个结果行上的 _sa_instance_state 重置为“瞬态”。 (2认同)

Pau*_*eux 8

您还可以查看 Pandas 数据框。例如,一种方法将使用pandas.read_sql(query, source.connection)df.to_sql(table_name, con=destination.connection)

  • to_sql() 方法不捕获表之间的关系。 (3认同)