使用 create_engine 将 SQLAlchemy 转换为 MSSQL

ana*_*ine 4 python sql-server sqlalchemy pandas

我能找到的大多数展示 Python 的完整 MSSQL 连接方法的示例在几个月前就已经过时了,这在一定程度上要归功于 SQLAlchemy 1.3中的一些优化。我正在尝试复制我在文档中看到的内容。

我在使用 pyodbc 将 SQLAlchemy 连接到 MSSSQL Server 时遇到问题。

我有一个本地 SQL 服务器,可以从 SQL Server Management Studio 访问:#DESKTOP-QLSOTTG\SQLEXPRESS
数据库是:TestDB
本示例中的用户名是:TestUser
密码,本示例中是:TestUserPass

我想运行一个将 pandas 数据帧导入 MSSQL 数据库的测试用例(案例?),以便找出最快的处理方式。然而,这个问题的目的是围绕连接性。

信用:我从 Gord那里借用了一些用于数据帧/更新的代码。

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


# for pyodbc
#engine = create_engine('mssql+pyodbc://TestUser:TestUserPAss@DESKTOP-QLSOTTG\\SQLEXPRESS:1433/TestDB?driver=ODBC+Driver+17+for+SQL+Server', fast_executemany=True)
engine = create_engine("mssql+pyodbc://TestUser:TestUserPass@DESKTOP-QLSOTTG\\SQLEXPRESS:1433/TestDB?driver=ODBC+Driver+13+for+SQL+Server", fast_executemany=True)

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")
Run Code Online (Sandbox Code Playgroud)

我收到的错误如下。我假设服务器“主动拒绝连接”是因为我的连接字符串不知何故搞砸了,但我似乎不明白为什么。:

OperationalError: (pyodbc.OperationalError) ('08001', '[08001] [Microsoft][ODBC Driver 13 for SQL Server]TCP Provider: No connection could be made because the target machine actively refused it.\r\n (10061) (SQLDriverConnect); [08001] [Microsoft][ODBC Driver 13 for SQL Server]Login timeout expired (0); [08001] [Microsoft][ODBC Driver 13 for SQL Server]A network-related or instance-specific error has occurred while establishing a connection to SQL Server. Server is not found or not accessible. Check if instance name is correct and if SQL Server is configured to allow remote connections. For more information see SQL Server Books Online. (10061)')
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Run Code Online (Sandbox Code Playgroud)

数据库和用户可从 SQL Server Management Studio 连接。*

关于我可能会缺少什么的任何想法?

SQL Server 屏幕截图

笔记:

  • 将 DESKTOP-QLSOTTG\SQLEXPRESS:1433 更改为 DESKTOP-QLSOTTG:1433 并没有改变错误
  • 将 DESKTOP-QLSOTTG\SQLEXPRESS:1433 更改为 localhost:1433 并没有改变错误
  • 将 DESKTOP-QLSOTTG\SQLEXPRESS:1433 更改为 localhost\SQLEXPRESS:1433 并没有改变错误

ana*_*ine 8

我将用一个完整的示例来回答这个问题,因为在此过程中我遇到了一些其他问题。

该示例能够:

  • 使用fast_executemany和用户指定的内存友好分块可以非常快速地将数据加载到 MS SQL 数据库。
  • 在大约 0.3 秒内将 10,000 条记录(25 列)加载到 Microsoft SQL (MSSQL) 数据库。
  • 在大约 45 秒内将 1,000,000 条记录(25 列)加载到 Microsoft SQL (MSSQL) 数据库。
  • 在大约 9 分钟内将 10,000,000 条记录(25 列)加载到 Microsoft SQL (MSSQL) 数据库。
  • 使用预配置函数对数据进行分块,避免使用 pandas chunksize,这会导致较大数据集出现内存错误。归功于分块。
  • 可以帮助您将数据加载到 RAM 较低的 SQL 服务器中。大容量加载需要大量内存,较小的加载大小使用的内存要少得多。
  • 可以添加 try/ except 语句来捕获您尝试记录的块的加载错误/稍后重试类型设置。

我为其他一些数据库提供商提供了一些未经测试的连接字符串。截至 2020 年 12 月的 pandas、sqlalchemy、pyodbc 等的当前版本。

%%time #remove this if you are not using a Jupyter notebook and just want to run a .py script

import pandas as pd
import numpy as np
import sqlalchemy as sql
import sys
import math

# Enterprise DB to be used
DRIVER = "ODBC Driver 17 for SQL Server"
USERNAME = "TestUser"
PSSWD = "TestUser"
SERVERNAME = "DESKTOP-QLSOTTG"
INSTANCENAME = "\SQLEXPRESS"
DB = "TestDB"
TABLE = "perftest"


conn_executemany = sql.create_engine(
    f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}", fast_executemany=True
)



def chunker(seq, size):
    return (seq[pos : pos + size] for pos in range(0, len(seq), size))


def insert_with_progress(df, engine, table="", schema=""):
    con = engine.connect()

    # Replace table
    engine.execute(f"DROP TABLE IF EXISTS {schema}.{table};")

    # Insert with progress
    SQL_SERVER_CHUNK_LIMIT = 100000
    chunksize = math.floor(SQL_SERVER_CHUNK_LIMIT / len(df.columns))

    for chunk in chunker(df, chunksize):
        chunk.to_sql(
            name=table,
            con=con,
            if_exists="append",
            index=False
        )
        
df = pd.DataFrame(np.random.random((10 ** 7, 24)))
df['TextCol'] = "Test Goes Here"
df.head()
print("DataFrame is", round(sys.getsizeof(df) / 1024 ** 2, 1), "MB")
print("DataFrame contains", len(df), "rows by", len(df.columns), "columns")


# Doing it like this errors out. Can't seem to be able to debug the straight pandas call.
# df.to_sql(TABLE, conn_sqlalchemy, index=False, if_exists='replace', method='multi', chunksize=2100)

insert_with_progress(df, conn_executemany, table=TABLE)
Run Code Online (Sandbox Code Playgroud)

关于连接字符串:

  1. f"mssql+pyodbc://如果您想更改为另一种数据库类型,您很可能只需要更改以开头的行
  2. 如果您的 SQL Server 不使用实例名称(例如 SQLSERVERNAME\Instance_Name),那么您可以将实例名称参数设置为空。
  3. 如果您确实使用实例名称,请确保将 \ 保留在变量的开头。
  4. 如果您使用不同的连接字符串,您还需要将上面代码窗口最后一行中的变量名称替换为您的连接字符串名称。

其他提供商的备用包含语句
其中包括:

  • pymssql
  • 涡轮增压器
import pymssql as ms
import sqlalchemy as sql
import sqlalchemy_turbodbc as st
Run Code Online (Sandbox Code Playgroud)

备用连接字符串
感谢DSN 样式字符串,我已将其修改为与用户名/密码一起使用。

conn_sqlalchemy = sql.create_engine(f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}")

conn_executemany = sql.create_engine(
    f"mssql+pyodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}", fast_executemany=True
)

conn_turbodbc = sql.create_engine(f"mssql+turbodbc://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}?driver={DRIVER}")

conn_pymssql = sql.create_engine(f"mssql+pymssql://{USERNAME}:{PSSWD}@{SERVERNAME}{INSTANCENAME}/{DB}")
Run Code Online (Sandbox Code Playgroud)