使用 pyodbc 将 Pandas 数据帧高效地更新插入到 MS SQL Server

ahh*_*ers 4 python sql sql-server pyodbc pandas

我正在尝试使用 pyodbc 将 Pandas 数据帧升级到 MS SQL Server。我以前使用过类似的方法来进行直线插入,但这次我尝试的解决方案非常慢。有没有比我拥有的更简化的方法来完成 upsert?

sql_connect = pyodbc.connect('Driver={SQL Server Native Client 11.0}; Server=blank1; Database=blank2; UID=blank3; PWD=blank4')
cursor = sql_connect.cursor()

for index, row in bdf.iterrows():
    res = cursor.execute("UPDATE dbo.MPA_BOOK_RAW SET [SITE]=?, [SHIP_TO]=?, [PROD_LINE]=?, [GROUP_NUMBER]=?, [DESCRIPTION]=?, [ORDER_QTY]=?, [BPS_INCLUDE]=? WHERE [CUST]=? AND [ORDER_NUMBER]=? AND [ORDER_DATE]=? AND [PURCHASE_ORDER]=? AND [CHANNEL]=? AND [ITEM]=? AND [END_DT]=?", 
                    row['SITE'], 
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ORDER_QTY'],
                    row['BPS_INCLUDE'],
                    row['CUST'],
                    row['ORDER_NUMBER'], 
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'], 
                    row['CHANNEL'],
                    row['ITEM'],
                    row['END_DT'])

    if res.rowcount == 0:
            cursor.execute("INSERT INTO dbo.MPA_BOOK_RAW ([SITE], [CUST], [ORDER_NUMBER], [ORDER_DATE], [PURCHASE_ORDER], [CHANNEL], [SHIP_TO], [PROD_LINE], [GROUP_NUMBER], [DESCRIPTION], [ITEM], [ORDER_QTY], [END_DT], [BPS_INCLUDE]) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 
                    row['SITE'], 
                    row['CUST'],
                    row['ORDER_NUMBER'], 
                    row['ORDER_DATE'],
                    row['PURCHASE_ORDER'], 
                    row['CHANNEL'],
                    row['SHIP_TO'],
                    row['PROD_LINE'],
                    row['GROUP_NUMBER'],
                    row['DESCRIPTION'],
                    row['ITEM'],
                    row['ORDER_QTY'],
                    row['END_DT'],
                    row['BPS_INCLUDE'])

    sql_connect.commit()

cursor.close()
sql_connect.close()
Run Code Online (Sandbox Code Playgroud)

我使用原始 ~50k 行数据帧的五行样本尝试了上述方法,并且效果很好。所以逻辑似乎没问题。只是速度是个问题。

Gor*_*son 5

Comments to the question suggest uploading the DataFrame to a temporary table and then merging the contents into the main table. Note, however, that the documentation for the T-SQL MERGE statement says:

Performance Tip: The conditional behavior described for the MERGE statement works best when the two tables have a complex mixture of matching characteristics. For example, inserting a row if it does not exist, or updating the row if it does match. When simply updating one table based on the rows of another table, improved performance and scalability can be achieved with basic INSERT, UPDATE, and DELETE statements.

In your case the matching criteria are relatively straightforward - just what is effectively a multi-column primary key - so you could simply use an anonymous code block with an UPDATE statement and an INSERT statement as in the following simplified MCVE code.

Minimum Requirements:

  • Python 3.6+ for f'...' string formatting
  • SQLAlchemy 1.3 for the fast_executemany argument to create_engine
  • DRIVER=ODBC Driver 17 for SQL Server; and UseFMTONLY=Yes; for reliable fast_executemany INSERTs to a SQL Server #temporary table
from pprint import pprint
import sys
import urllib

import pandas as pd
import pyodbc
import sqlalchemy as sa

print(sys.version)
# 3.7.5 (tags/v3.7.5:5c02a39a0b, Oct 15 2019, 00:11:34) [MSC v.1916 64 bit (AMD64)]
print(
    f"SQLAlchemy {sa.__version__}, pandas {pd.__version__}, pyodbc {pyodbc.version}"
)
# SQLAlchemy 1.3.19, pandas 1.1.2, pyodbc 4.0.30

connection_string = (
    r"DRIVER=ODBC Driver 17 for SQL Server;"
    r"SERVER=(local)\SQLEXPRESS;"
    r"DATABASE=myDb;"
    r"Trusted_Connection=Yes;"
    r"UseFMTONLY=Yes;"
)
sqlalchemy_url = "mssql+pyodbc:///?odbc_connect=" + urllib.parse.quote_plus(
    connection_string
)
engine = sa.create_engine(sqlalchemy_url, fast_executemany=True)

with engine.begin() as conn:
    # set up test environment
    conn.execute(sa.text("DROP TABLE IF EXISTS actual_table;"))
    conn.execute(
        sa.text(
            """\
            CREATE TABLE actual_table (
                institution_no VARCHAR(3), 
                transit_no VARCHAR(5), 
                branch_name VARCHAR(50),
                CONSTRAINT PK_actual_table PRIMARY KEY CLUSTERED 
                    (institution_no, transit_no));
            """
        )
    )
    # actual_table initial state
    conn.execute(
        sa.text(
            """\
            INSERT INTO actual_table (institution_no, transit_no, branch_name) VALUES 
                ('002', '45678', 'Scotiabank branch #45678 - *** UPDATE NEEDED ***'),
                ('003', '67890', 'RBC branch #67890 - Sudbury, ON');
            """
        )
    )
    # test data to be updated or inserted
    update_columns = ["institution_no", "transit_no", "branch_name"]
    update_data = [
        ["004", "12345", "TD branch #12345 - London, ON"],
        ["002", "45678", "Scotiabank branch #45678 - Timmins, ON"],
        ["004", "34567", "TD branch #34567 - Toronto, ON"],
    ]
    df_update = pd.DataFrame(update_data, columns=update_columns)

    # Here's where the real work begins ...
    #
    # Step 1: upload update data
    df_update.to_sql("#update_table", conn, index=False)
    #
    # Step 2: perform the "upsert"
    sql = """\
    SET NOCOUNT ON;
    DECLARE @rows_updated INT = 0;
    DECLARE @rows_inserted INT = 0;
    
    UPDATE a SET a.branch_name = u.branch_name
        FROM actual_table a INNER JOIN #update_table u
            ON a.institution_no = u.institution_no 
                AND a.transit_no = u.transit_no;
    SELECT @rows_updated = @@ROWCOUNT;
    
    INSERT INTO actual_table (institution_no, transit_no, branch_name)
        SELECT institution_no, transit_no, branch_name
        FROM #update_table u
        WHERE NOT EXISTS (
            SELECT * FROM actual_table
            WHERE institution_no = u.institution_no
                AND transit_no = u.transit_no
        );
    SELECT @rows_inserted = @@ROWCOUNT;
    
    SELECT @rows_updated AS rows_updated, @rows_inserted AS rows_inserted;
    """
    result = conn.execute(sa.text(sql)).fetchone()
    print(f"{result[0]} row(s) updated, {result[1]} row(s) inserted")
    # 1 row(s) updated, 2 row(s) inserted

# verify results
with engine.begin() as conn:
    pprint(conn.execute(sa.text("SELECT * FROM actual_table")).fetchall())
    """console output:
    [('002', '45678', 'Scotiabank branch #45678 - Timmins, ON'),
     ('003', '67890', 'RBC branch #67890 - Sudbury, ON'),
     ('004', '12345', 'TD branch #12345 - London, ON'),
     ('004', '34567', 'TD branch #34567 - Toronto, ON')]
    """
Run Code Online (Sandbox Code Playgroud)