使用SQLAlchemy从MS SQL到PostgreSQL的数据迁移

ile*_*led 7 sql-server postgresql data-migration postgis sqlalchemy

TL; DR

我想将数据从MS SQL Server + ArcSDE迁移到PostgreSQL + PostGIS,理想情况下使用SQLAlchemy.


我使用SQLAlchemy 1.0.11将现有数据库从MS SQL 2012迁移到PostgreSQL 9.2(升级到9.5计划).

我一直在读这个,发现了几个不同的来源(Tyler Lesmann,Inada Naoki,Stefan UrbanekMathias Fussenegger),他们采用了类似的方法来完成这项任务:

  1. 连接到两个数据库
  2. 反映源数据库的表
  3. 迭代表格和每个表格
    1. 在目标数据库中创建一个相等的表
    2. 获取源中的行并将其插入目标数据库中

以下是使用上一个参考中的代码的简短示例.

from sqlalchemy import create_engine, MetaData

src = create_engine('mssql://user:pass@host/database?driver=ODBC+Driver+13+for+SQL+Server')
dst = create_engine('postgresql://user:pass@host/database')

meta = MetaData()
meta.reflect(bind=src)

tables = meta.tables

for tbl in tables:
    data = src.execute(tables[tbl].select()).fetchall()
    if data:
        dst.execute(tables[tbl].insert(), data)
Run Code Online (Sandbox Code Playgroud)

我知道同时获取所有行是一个坏主意,它可以使用迭代器或使用fetchmany,但现在不是我的问题.

问题1

所有这四个例子都失败了我的数据库.我得到的一个错误与类型列有关NVARCHAR:

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) type "nvarchar" does not exist
LINE 5:  "desigOperador" NVARCHAR(100) COLLATE "SQL_Latin1_General_C...
                         ^
 [SQL: '\nCREATE TABLE "Operators" (\n\t"idOperador" INTEGER NOT NULL, \n\t"idGrupo" INTEGER, \n\t"desigOperador" NVARCHAR(100) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\t"Rua" NVARCHAR(200) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\t"Localidade" NVARCHAR(200) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\t"codPostal" NVARCHAR(10) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\tdataini DATETIME, \n\tdataact DATETIME, \n\temail NVARCHAR(50) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\turl NVARCHAR(50) COLLATE "SQL_Latin1_General_CP1_CI_AS", \n\tPRIMARY KEY ("idOperador")\n)\n\n']
Run Code Online (Sandbox Code Playgroud)

我从这个错误的理解是,PostgreSQL没有NVARCHAR,但是VARCHAR,这应该是等价的.我认为SQLAlchemy会自动将它们映射到String它的抽象层,但在这种情况下它可能不会那样工作.

问题:我是否应事先定义所有类/表,例如in models.py,以避免这样的错误?如果是这样,它将如何与给定(或其他)工作流程集成?

事实上,这个错误是从Urbanek运行代码获得的,我可以在其中指定要复制的表.运行上面的示例,让我...

问题2

MS SQL安装是使用地理数据库的ArcSDE(空间数据库引擎).因此,某些列属于非默认的几何类型.在PostgreSQL方面,我使用的是PostGIS 2.

当尝试复制具有这些类型的表时,我收到如下警告:

/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/mssql/base.py:1791: SAWarning: Did not recognize type 'geometry' of column 'geom'
  (type, name))
/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/mssql/base.py:1791: SAWarning: Did not recognize type 'geometry' of column 'shape'
Run Code Online (Sandbox Code Playgroud)

后来又出现了另一个错误(这个错误在执行上面提供的代码时实际上被抛出):

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) relation "SDE_spatial_references" does not exist
LINE 1: INSERT INTO "SDE_spatial_references" (srid, description, aut...
                    ^
Run Code Online (Sandbox Code Playgroud)

我认为它无法创建警告中引用的列,但是当需要这些列时,错误会在稍后的步骤中抛出.

问题:问题是前一个问题的扩展:如何使用自定义(或定义的其他地方)类型进行迁移?

我知道可以与PostGIS一起使用的GeoAlchemy2.GeoAlchemy支持MS SQL Server 2008,但在这种情况下,我想我已经陷入了SQLAlchemy 0.8.4(可能具有不太好的功能).此外,我在这里发现可以使用GeoAlchemy定义的类型进行反射.但是,我的问题仍然存在.

可能有关系

编辑

当我看到错误引用时,SDE_spatial_references我认为它可能与ArcSDE有关,因为同一台机器也安装了ArcGIS for Server.然后我了解到MS SQL Server也有一些空间数据类型,然后我确认了这种情况.我对这个编辑错了:数据库确实在使用ArcSDE.

编辑2

以下是我忘记包含的更多细节.

不必使用SQLAlchemy进行迁移.我认为那是个好主意,因为:

  • 我更喜欢使用Python
  • 解决方案必须与FOSS一起使用
  • 理想情况下,它可以很容易地重现,并且可以启动和等待
  • 迁移后,我想使用Alembic进行进一步的模式迁移

我尝试过的其他事情都失败了(现在不记得确切的原因,但如果有任何答案,我会再次通过它们):

  • 水壶
  • Geokettle
  • ogr2ogr(仍在尝试这种方法)

数据库细节:

  • 小型数据库,±3 GB
  • ±40桌
  • 有表格包含空间和非空间数据
  • 这两个数据库(SQL Server和PostgreSQL)位于同一台服务器上,运行Windows Server 2008
  • 停机时间没有大问题(最多8小时就可以了)

ile*_*led 5

这是我使用SQLAlchemy的解决方案。这是一篇类似长博客的文章,我希望这是可以接受的,并且对某人有用。

尽管未经测试,但这也可能适用于数据库和目标数据库的其他组合(分别为MS SQL Server和PostgreSQL)。

工作流程(TL; DR的排序)

  1. 自动检查源并推断现有的表模型(这称为反射)。
  2. 导入先前定义的表模型,这些模型将用于在目标中创建新表。
  3. 遍历表模型(源模型和目标模型中都存在的模型)。
  4. 对于每个表,从源中获取行的大块并将其插入目标。

要求


详细步骤

1.连接到数据库

SQLAlchemy将引擎调用引擎,该引擎处理应用程序和实际数据库之间的连接。因此,要连接到数据库,必须使用相应的连接字符串创建引擎。数据库URL的典型形式为:

dialect+driver://username:password@host:port/database
Run Code Online (Sandbox Code Playgroud)

您可以在SQLAlchemy文档中看到一些连接URL的示例。

一旦创建,引擎将不会建立连接,直到通过.connect()方法或调用依赖于该方法的操作(例如.execute())明确告知这样做为止。

con = ms_sql.connect()
Run Code Online (Sandbox Code Playgroud)

2.定义和创建表

2.1源数据库

源端的表已经定义,因此我们可以使用表反射:

from sqlalchemy import MetaData

metadata = MetaData(source_engine)
metadata.reflect(bind=source_engine)
Run Code Online (Sandbox Code Playgroud)

如果尝试此操作,则可能会看到一些警告。例如,

SAWarning: Did not recognize type 'geometry' of column 'Shape'
Run Code Online (Sandbox Code Playgroud)

这是因为SQLAlchemy无法自动识别自定义类型。在我的特定情况下,这是因为有ArcSDE类型。但是,当您只需要读取数据时,这没有问题。只需忽略这些警告即可。

表反射后,您可以通过该元数据对象访问现有表。

# see all the tables names
print list(metadata.tables)
# handle the table named 'Troco'
src_table = metadata.tables['Troco']
# see that table columns
print src_table.c
Run Code Online (Sandbox Code Playgroud)

2.2目标数据库

对于目标,因为我们正在启动新数据库,所以无法使用表反射。但是,通过SQLAlchemy创建表模型并不复杂;实际上,它甚至可能比编写纯SQL更简单。

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class SomeClass(Base):
    __tablename__ = 'some_table'
    id = Column(Integer, primary_key=True)
    name =  Column(String(50))
    Shape = Column(Geometry('MULTIPOLYGON', srid=102165))
Run Code Online (Sandbox Code Playgroud)

在此示例中,有一列包含空间数据(由于有了GeoAlchemy2,在此进行了定义)。

现在,如果您有十分之几的表,那么定义那么多的表可能令人困惑,乏味或容易出错。幸运的是,有一个sqlacodegen,该工具可以读取现有数据库的结构并生成相应的SQLAlchemy模型代码。例:

pip install sqlacodegen
sqlacodegen mssql:///some_local_db --outfile models.py
Run Code Online (Sandbox Code Playgroud)

因为这里的目的只是迁移数据,而不是迁移架构,所以您可以从源数据库创建模型,并仅将生成的代码调整/更正为目标数据库。

注意:它将生成混合class模型和Table模型。在此处阅读有关此行为的信息。

同样,您将看到有关无法识别的自定义数据类型的类似警告。这就是我们现在必须编辑models.py文件并调整模型的原因之一。以下是一些需要调整的提示:

  • 具有自定义数据类型的列用定义NullType。将它们替换为正确的类型,例如GeoAlchemy2的Geometry。定义时Geometry,传递正确的几何类型(线串,多线串,多边形等)和SRID。
  • PostgreSQL的性格类型是可变长度的能力,并且SQLAlchemy的地图将String默认列给他们,所以我们可以更换所有UnicodeString(...)通过String。请注意,在中不必指定字符数,也不是必需的(不要在此引用我)String
  • 您必须仔细检查,但BIT实际上所有列都可能是Boolean
  • 大多数数值类型(例如,Float(...)Numeric(...)),同样对字符类型,可以简化为Numeric。请注意例外情况和/或某些特定情况。
  • 我注意到定义为索引(index=True)的列存在一些问题。就我而言,因为将要迁移模式,所以现在不应该使用它们,并且可以安全地删除它们。
  • 确保两个数据库中的表名和列名相同(反射表和定义的模型),这是后续步骤的要求。

现在,我们可以将模型和数据库连接在一起,并在目标端创建所有表。

Base.metadata.bind = postgres
Base.metadata.create_all()
Run Code Online (Sandbox Code Playgroud)

请注意,默认情况下,.create_all()不会触摸现有表格。如果要重新创建数据或将数据插入到现有表中,则需要DROP事先进行准备。

Base.metadata.drop_all()
Run Code Online (Sandbox Code Playgroud)

3.获取数据

现在您可以从一侧复制数据,然后将其粘贴到另一侧。基本上,您只需要SELECT为每个表发出查询。这是在SQLAlchemy ORM提供的抽象层上可以轻松实现的事情。

data = ms_sql.execute(metadata.tables['TableName'].select()).fetchall()
Run Code Online (Sandbox Code Playgroud)

但是,这还不够,您将需要更多的控制权。其原因与ArcSDE有关。因为它使用专有格式,所以您可以检索数据,但不能正确解析它。您将获得如下内容:

(1, Decimal('0'), u' ', bytearray(b'\x01\x02\x00\x00\x00\x02\x00\x00\x00@\xb1\xbf\xec/\xf8\xf4\xc0\x80\nF%\x99(\xf9\xc0@\xe3\xa5\x9b\x94\xf6\xf4\xc0\x806\xab>\xc5%\xf9\xc0'))
Run Code Online (Sandbox Code Playgroud)

此处的解决方法是将几何列转换为众所周知的文本(WKT)格式。这种转换必须在数据库端进行。ArcSDE在那里,因此它知道如何进行转换。因此,例如,在TableName中,有一列包含称为shape的空间数据。所需的SQL语句应如下所示:

SELECT [TableName].[shape].STAsText() FROM [TableName]
Run Code Online (Sandbox Code Playgroud)

这使用.STAsText()SQL Server的geometry数据类型方法。

如果您不使用ArcSDE,则不需要执行以下步骤:

  • 遍历表(仅在源和目标中定义的表),
  • 对于每个表,查找几何列(预先列出它们)
  • 建立类似上面的一条SQL语句

构建语句后,SQLAlchemy可以执行它。

(1, Decimal('0'), u' ', bytearray(b'\x01\x02\x00\x00\x00\x02\x00\x00\x00@\xb1\xbf\xec/\xf8\xf4\xc0\x80\nF%\x99(\xf9\xc0@\xe3\xa5\x9b\x94\xf6\xf4\xc0\x806\xab>\xc5%\xf9\xc0'))
Run Code Online (Sandbox Code Playgroud)

实际上,这实际上并没有获取数据(与ORM示例比较-请注意丢失的.fetchall()调用)。为了解释,这里是SQLAlchemy文档的引文:

返回的结果是的实例ResultProxy,该实例引用DBAPI游标,并提供与DBAPI游标的接口很大程度上兼容的接口。ResultProxy 当所有结果行(如果有)用完时,DBAPI游标将被关闭。

数据将仅在插入之前被检索。

4.插入数据

建立连接,创建表,准备数据,现在让我们插入。与获取数据类似,SQLAlchemy还允许INSERT通过其ORM 将数据存储到给定表中:

SELECT [TableName].[shape].STAsText() FROM [TableName]
Run Code Online (Sandbox Code Playgroud)

同样,这很容易,但是由于非标准格式和错误数据,可能需要进一步操作。

4.1匹配列

首先,将源列与目标列(同一张表)匹配时存在一些问题-可能与该Geometry列有关。一个可能的解决方案是创建一个Python字典,该字典将源列中的值映射到目标列的键(名称)。

这是逐行执行的-尽管它并不像人们想象的那样慢,因为实际插入将同时进行几行。因此,每行将有一个字典,并且您将插入一列字典,而不是插入数据对象(这是一个元组列表;一个元组对应于一行)。

这是一个单行的示例。提取的数据是一个包含一个元组的列表,而值是构建的字典。

result = ms_sql.execute(statement)
Run Code Online (Sandbox Code Playgroud)

请注意,Python字典没有排序,这就是两个列表中数字不在同一位置的原因。为了简化,从该示例中删除了几何列。

4.2固定几何

如果未发生此问题,则可能不需要以前的解决方法:有时使用错误的类型存储/检索几何。

在MSSQL / ArcSDE中,几何数据类型不指定要存储的几何类型(例如,线,多边形等)。它只关心它是一个几何。此信息存储在另一个(系统)表中,该表称为SDE_geometry_columns(请参阅该页面底部)。但是,Postgres(实际上是PostGIS)在定义几何列时需要几何类型。

这导致以错误的几何类型存储空间数据。错误是指它与应有的含义不同。例如,查看SDE_geometry_columns表(节选):

f_table_name        geometry_type
TableName               9
Run Code Online (Sandbox Code Playgroud)

geometry_type = 9对应于ST_MULTILINESTRING。但是,TableName表中有一些行以(存储(或接收))ST_LINESTRING。这种不匹配会在Postgres方面引发错误。

解决方法是,您可以在创建上述字典时编辑WKT。例如,'LINESTRING (10 12, 20 22)'将转换为MULTILINESTRING ((10 12, 20 22))'

4.3缺少SRID

最后,如果您愿意保留SRID,则在创建几何列时还需要定义它们。

如果在表模型中定义了SRID,则在Postgres中插入数据时必须满足它。问题在于,使用该.STAsText()方法将几何数据作为WKT提取时,您会丢失SRID信息。

幸运的是,PostGIS支持包含SRID 的扩展WKT(E-WKT)格式。解决方案是在修复几何形状时包括SRID。在相同的示例中,'LINESTRING (10 12, 20 22)'将转换为'SRID=102165;MULTILINESTRING ((10 12, 20 22))'

4.4提取并插入

修复所有问题后,就可以插入了。如前所述,只有现在才实际从源中检索数据。您可以按块(用户定义的数量)的数据(例如,一次1000行)进行此操作。

而True:rows = data.fetchmany(1000)(如果不是,则行:break values = [{key:(val,如果key.lower()!=“ shape” else fix(val,102165))),则zip中的val(键,一行)} [一行一行] postgres_engine.execute(target_table.insert(),values)

fix()是将校正几何形状并将给定的SRID附加到几何列(在此示例中,由“形状”的列名标识)的函数(如上所述),并且值是上述列的字典。

结果

结果是将存在于MS SQL Server + ArcSDE数据库中的架构和数据复制到PostgreSQL + PostGIS数据库中。

以下是我的用例中用于性能分析的一些统计信息。两个数据库都在同一台机器上;该代码是从另一台计算机执行的,但是在同一局域网中。

Tables   |   Geometry Column   |   Rows   |   Fixed Geometries   |   Insert Time
---------------------------------------------------------------------------------
Table 1      MULTILINESTRING      1114797             702              17min12s
Table 2            None            460874             ---               4min55s
Table 3      MULTILINESTRING       389485          389485               4min20s
Table 4        MULTIPOLYGON          4050            3993                   34s
Total                             3777964          871243              48min27s
Run Code Online (Sandbox Code Playgroud)