使用pyODBC的fast_executemany加速pandas.DataFrame.to_sql

J.K*_*.K. 43 python sqlalchemy pyodbc pandas-to-sql

我想发送一个大型pandas.DataFrame到运行MS SQL的远程服务器.我现在的方法是将data_frame对象转换为元组列表,然后使用pyODBC的executemany()函数将其发送出去.它是这样的:

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()
Run Code Online (Sandbox Code Playgroud)

然后我开始怀疑使用data_frame.to_sql()方法是否可以加速(或至少更具可读性).我想出了以下解决方案:

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)
Run Code Online (Sandbox Code Playgroud)

现在代码更具可读性,但上传速度至少慢150倍 ......

有没有办法fast_executemany在使用SQLAlchemy时翻转?

我正在使用pandas-0.20.3,pyODBC-4.0.21和sqlalchemy-1.1.13.

het*_*jee 53

刚刚发了一个帐号来发布这个.我想在上面的帖子下面发表评论,因为它是已经提供的答案的后续内容.上面的解决方案适用于使用基于Ubuntu的安装的Microsft SQL存储上的Version 17 SQL驱动程序.

我用来显着提高速度的完整代码(谈话> 100倍加速)低于.这是一个交钥匙片段,前提是您可以使用相关详细信息更改连接字符串.在上面的海报中,非常感谢你的解决方案,因为我已经看了很长时间了.

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)


@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    print("FUNC call")
    if executemany:
        cursor.fast_executemany = True


table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))


s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)
Run Code Online (Sandbox Code Playgroud)

根据下面的评论,我想花一些时间来解释一些关于pandas engine = create_engine(sqlalchemy_url, fast_executemany=True)实现和查询处理方式的限制.有两件事可能导致mssql+pyodbc被提出的事情:

1)假设您正在写入远程SQL存储.当您尝试使用该@event.listens_for(engine, 'before_cursor_execute')方法编写大型pandas DataFrame时,它会将整个数据帧转换为值列表.这种转换占用了比原始DataFrame更多的RAM(在它之上,因为旧的DataFrame仍然存在于RAM中).此列表提供给to_sqlODBC连接器的最终调用.我认为ODBC连接器在处理如此大的查询时遇到了一些麻烦.解决这个问题的MemoryError一种方法是为方法提供一个chunksize参数(10**5似乎是最佳的,在Azure的2 CPU 7GB ram MSSQL存储应用程序上提供大约600 mbit/s(!)的写入速度 - 不能推荐Azure btw).因此,第一个限制是查询大小,可以通过提供to_sql参数来规避.但是,这不会使您编写大小为10**7或更大的数据帧(至少不在我正在使用的具有~55GB RAM的VM上),因为发布nr 2.

这可以通过使用executemany(10**6大小的DataFrame块)拆分DataFrame来规避这些.这些可以迭代地写出.当我to_sql为pandas本身的核心方法准备好解决方案时,我会尝试发出一个pull请求,这样你就不必每次都预先分解.无论如何,我最终写了一个类似的功能(不是交钥匙)到下面:

import pandas as pd
import numpy as np

def write_df_to_sql(df, **kwargs):
    chunks = np.split(df, df.shape()[0] / 10**6)
    for chunk in chunks:
        chunk.to_sql(**kwargs)
    return True
Run Code Online (Sandbox Code Playgroud)

可以在此处查看上述代码段的更完整示例:https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

这是我编写的一个包含补丁的类,并简化了与SQL建立连接所带来的一些必要开销.还是要写一些文档.此外,我还计划为大熊猫本身提供补丁,但还没有找到一个很好的方法来解决这个问题.

我希望这有帮助.

  • @hetspookjee-由于这是迄今为止最受欢迎的答案,因此请考虑对其进行更新以提及SQLAlchemy 1.3.0,于2019-03-04发布,现在为`mssql支持`engine = create_engine(sqlalchemy_url,fast_executemany = True)` + pyodbc`方言。也就是说,不再需要定义一个函数并使用@ event.listens_for(engine,'before_cursor_execute')`。谢谢。 (5认同)
  • 不知道为什么我之前没有分享过这个,但这是我经常用于从 SQL 数据库中获取数据帧的类:https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector .py 享受吧! (2认同)
  • @erickfis我用一个正确的例子更新了这个类.请注意,并非每个数据库都使用相同的驱动程序,因此在使用此类时会引发错误.不使用它的示例数据库是PostgreSQL.我还没有找到一种将数据插入PSQL的快速方法.仍然像这样使用这个类的一种方法是通过调用以下方式显式关闭开关:`con._init_engine(SET_FAST_EXECUTEMANY_SWITCH = False)`初始化类之后.祝好运. (2认同)

J.K*_*.K. 22

在联系了SQLAlchemy的开发人员之后,出现了一种解决这个问题的方法.非常感谢他们的出色工作!

必须使用游标执行事件并检查是否executemany已引发该标志.如果确实如此,请打开fast_executemany选项.例如:

from sqlalchemy import event

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True
Run Code Online (Sandbox Code Playgroud)

有关执行事件的更多信息,请访问此处.

  • @JK - 请考虑更新您的答案,提及 2019 年 3 月 4 日发布的 SQLAlchemy 1.3.0 现在支持 `mssql+pyodbc` 方言的 `engine = create_engine(sqlalchemy_url, fast_executemany=True)`。即,不再需要定义函数并使用`@event.listens_for(engine, 'before_cursor_execute')`。谢谢。 (3认同)
  • 非常感谢你做这方面的工作.为了清楚起见,应该在实例化SQLAlchemy引擎之前声明这个装饰器和函数? (2认同)
  • 非常欢迎你.我在类的构造函数中实例化引擎后声明它. (2认同)
  • 我尝试在函数后直接调用`to_sql`,但它没有加快速度 (2认同)

Emm*_*uel 14

我在使用PostgreSQL时遇到了同样的问题。现在,他们刚刚发布了熊猫版本0.24.0,并且该to_sql函数中有一个新参数method可以解决我的问题。

from sqlalchemy import create_engine

engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")
Run Code Online (Sandbox Code Playgroud)

对我来说,上传速度快100倍。chunksize如果要发送大量数据,我还建议设置该参数。

  • 根据 https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#io-sql-method,设置 `method='multi'` 可能会在加载到传统 RDBMS 时减慢插入速度具有许多列的表,但在处理宽表时,对于 Redshift 等环境往往更有用。 (5认同)

Pyl*_*der 9

我只想将这个完整的示例发布为可以使用新turbodbc库的人的另一个高性能选项:http://turbodbc.readthedocs.io/en/latest/

在pandas .to_sql(),通过sqlalchemy触发fast_executemany,使用pyodbc直接使用tuples/lists /等,甚至尝试BULK UPLOAD与平面文件之间有很多选择.

希望以下可能会使当前的熊猫项目中的功能发展变得更加愉快,或者包括未来的turbodbc集成.

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)

test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]

                CREATE TABLE [db_name].[schema].[test]
                (
                    id int NULL,
                    transaction_dt datetime NULL,
                    units int NULL,
                    measures float NULL
                )

                INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                VALUES (?,?,?,?) '''

cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]
Run Code Online (Sandbox Code Playgroud)

在很多用例中,turbodbc应该非常快(特别是对于numpy数组).请注意直接将基础numpy数组从dataframe列作为参数传递给查询是多么简单.我也相信这有助于防止创建过度刺激内存消耗的中间对象.希望这有用!

  • @erickfis 太棒了!我很高兴您最终发现它值得满足您的需求,并感谢您链接您精彩的演示帖子。它应该有助于推广这个答案,并提高turbodbc项目对寻求解决方案的人们的影响。 (2认同)

Ilj*_*ilä 6

似乎Pandas 0.23.0和0.24.0在PyODBC中使用了多值插入,这阻止了快速执行的帮助- INSERT ... VALUES ...每个块发出一个语句。多值插入块是对旧的慢速执行默认设置的改进,但至少在简单测试中,快速执行方法仍然占主导地位,更不用说chunksize像多值插入那样不需要手动计算了。如果将来不提供任何配置选项,则可以通过monkeypatching来强制执行旧的行为:

import pandas.io.sql

def insert_statement(self, data, conn):
    return self.table.insert(), data

pandas.io.sql.SQLTable.insert_statement = insert_statement
Run Code Online (Sandbox Code Playgroud)

未来在这里和至少在master分支可以使用关键字参数来控制所述插入方法method=to_sql()。默认为None,这将强制执行executemany方法。通过method='multi'使用多值插入来传递结果。它甚至可以用于实现DBMS特定的方法,例如Postgresql COPY

  • 尚未检查当前情况如何,但它已在0.24.0版本中放回。编辑:它仍然至少在master分支中存在,但是现在是可控的:https://github.com/pandas-dev/pandas/blob/master/pandas/io/sql.py#L1157。好像传递`to_sql(...,method = None)`应该强制执行方法。 (2认同)
  • ...默认为“无”。 (2认同)

eri*_*fis 5

正如@Pylander 指出的那样

到目前为止,Turbodbc 是数据摄取的最佳选择!

我对此感到非常兴奋,以至于我在我的 github 和媒体上写了一篇关于它的“博客”:请查看https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

有关工作示例并与 pandas.to_sql 进行比较

长话短说,

使用 turbodbc 我在 3 秒内得到 10000 行(77 列)

使用 pandas.to_sql 我在 198 秒内得到了相同的 10000 行(77 列)...

这是我正在做的全部细节

进口:

import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time
Run Code Online (Sandbox Code Playgroud)

加载并处理一些数据 - 用我的 sample.pkl 代替你的:

df = pd.read_pickle('sample.pkl')

df.columns = df.columns.str.strip()  # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan)  # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0)  # remove rows containing only NAs
df = df.dropna(how='all', axis=1)  # remove columns containing only NAs
df = df.replace(np.nan, 'NA')  # turbodbc hates null values...
Run Code Online (Sandbox Code Playgroud)

使用 sqlAlchemy 创建表

不幸的是,turbodbc 需要大量开销和大量 sql 手工劳动,用于创建表和在其上插入数据。

幸运的是,Python 是纯粹的乐趣,我们可以自动化编写 sql 代码的这个过程。

第一步是创建将接收我们的数据的表。但是,如果您的表有多于几列,则手动编写 sql 代码创建表可能会出现问题。就我而言,表格通常有 240 列!

这就是 sqlAlchemy 和 pandas 仍然可以帮助我们的地方:pandas 不适合写入大量行(在此示例中为 10000),但是只有 6 行,即表头呢?通过这种方式,我们可以自动化创建表的过程。

创建 sqlAlchemy 连接:

mydb = 'someDB'

def make_con(db):
    """Connect to a specified db."""
    database_connection = sqlalchemy.create_engine(
        'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
            myuser, mypassword,
            myhost, db
            )
        )
    return database_connection

pd_connection = make_con(mydb)
Run Code Online (Sandbox Code Playgroud)

在 SQL Server 上创建表

使用 pandas + sqlAlchemy,但只是为了为前面提到的 turbodbc 准备空间。请注意这里的 df.head():我们使用 pandas + sqlAlchemy 仅插入 6 行数据。这将运行得非常快,并且正在完成以自动化表创建。

table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)
Run Code Online (Sandbox Code Playgroud)

现在桌子已经就位,让我们认真对待。

Turbodbc 连接:

def turbo_conn(mydb):
    """Connect to a specified db - turbo."""
    database_connection = turbodbc.connect(
                                            driver='ODBC Driver 17 for SQL Server',
                                            server=myhost,
                                            database=mydb,
                                            uid=myuser,
                                            pwd=mypassword
                                        )
    return database_connection
Run Code Online (Sandbox Code Playgroud)

为 turbodbc 准备 sql 命令和数据。让我们自动化这个创造性的代码创建:

def turbo_write(mydb, df, table):
    """Use turbodbc to insert data into sql."""
    start = time.time()
    # preparing columns
    colunas = '('
    colunas += ', '.join(df.columns)
    colunas += ')'

    # preparing value place holders
    val_place_holder = ['?' for col in df.columns]
    sql_val = '('
    sql_val += ', '.join(val_place_holder)
    sql_val += ')'

    # writing sql query for turbodbc
    sql = f"""
    INSERT INTO {mydb}.dbo.{table} {colunas}
    VALUES {sql_val}
    """

    # writing array of values for turbodbc
    valores_df = [df[col].values for col in df.columns]

    # cleans the previous head insert
    with connection.cursor() as cursor:
        cursor.execute(f"delete from {mydb}.dbo.{table}")
        connection.commit()

    # inserts data, for real
    with connection.cursor() as cursor:
        try:
            cursor.executemanycolumns(sql, valores_df)
            connection.commit()
        except Exception:
            connection.rollback()
            print('something went wrong')

    stop = time.time() - start
    return print(f'finished in {stop} seconds')
Run Code Online (Sandbox Code Playgroud)

使用 turbodbc 写入数据 - 我在 3 秒内有 10000 行(77 列):

turbo_write(mydb, df.sample(10000), table)
Run Code Online (Sandbox Code Playgroud)

Pandas 方法比较 - 我在 198 秒内得到了相同的 10000 行(77 列)......

table = 'pd_testing'

def pandas_comparisson(df, table):
    """Load data using pandas."""
    start = time.time()
    df.to_sql(table, con=pd_connection, index=False)
    stop = time.time() - start
    return print(f'finished in {stop} seconds')

pandas_comparisson(df.sample(10000), table)
Run Code Online (Sandbox Code Playgroud)

环境条件

Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges
Run Code Online (Sandbox Code Playgroud)

请检查https://erickfis.github.io/loose-code/以获取此代码中的更新!


Gor*_*son 5

SQL Server INSERT 性能:pyodbc 与 turbodbc

to_sql用于将 Pandas DataFrame 上传到 SQL Server 时,turbodbc 肯定会比没有fast_executemany. 但是,fast_executemany启用 pyodbc 后,两种方法产生的性能基本相同。

测试环境:

[venv1_pyodbc]
pyodbc 2.0.25

[venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0

[两者通用]
Windows 上的 Python 3.6.4 64 位
SQLAlchemy 1.3.0b1
pandas 0.23.4
numpy 1.15.4

测试代码:

# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")
Run Code Online (Sandbox Code Playgroud)

测试对每个环境运行十二 (12) 次,丢弃每个环境的单个最佳和最差时间。结果(以秒为单位):

   rank  pyodbc  turbodbc
   ----  ------  --------
      1    22.8      27.5
      2    23.4      28.1
      3    24.6      28.2
      4    25.2      28.5
      5    25.7      29.3
      6    26.9      29.9
      7    27.0      31.4
      8    30.1      32.1
      9    33.6      32.5
     10    39.8      32.9
   ----  ------  --------
average    27.9      30.0
Run Code Online (Sandbox Code Playgroud)