使用 Dask 的新 to_sql 提高效率(内存/速度)或替代从 dask 数据帧获取数据到 SQL Server 表

Dav*_*son 5 sql-server sqlalchemy pandas dask dask-to-sql

我的最终目标是将 SQL/Python 一起用于一个有太多数据供熊猫处理的项目(至少在我的机器上)。所以,我去了dask

  1. 从多个来源(主要是 SQL Server 表/视图)读入数据
  2. 将数据操作/合并到一个包含约 1000 万行和 52 列的大型 dask 数据框表中,其中一些具有一些长的唯一字符串
  3. 每天将其写回 SQL Server,以便我的 PowerBI 报表可以自动刷新数据。

对于 #1 和 #2,它们需要大约 30 秒才能使用最少的内存来执行(几个 SQL 查询大约 200 行代码使用 dask 操作大型数据集)。又快又好玩!!!

但是,上面的#3 一直是主要的瓶颈。在(1. 内存和 2. 速度(执行时间))方面,使用 dask 或其他替代方法完成 #3 的一些有效方法是什么?查看更多背景,以及我尝试过的内容和得出的一些结论。


对于上面的#1、#2 和#3,由于内存限制/执行时间长,这是我发现用熊猫无法完成的任务,但是dask用出色的颜色解决了上面的#1 和#2,但我仍然与#3 苦苦挣扎——以自动方式将数据返回到 SQL 表中,我没有发送到 .csv 然后导入到 SQL Server。我试图.compute()将 dask 数据帧转换为 Pandas 数据帧,然后写入to_sql,但是这种方式违背了使用 dask 读取/数据模型的目的,并且再次耗尽内存/无论如何都要永远执行。

因此,新计划是to_csv每天生成一个新的 .csv 并使用查询将数据批量插入到表中。我认为这仍然是一个可行的解决方案;但是,今天,我很高兴地发现 dask 发布了一个新to_sql功能(https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql)。利用有关此主题的现有 StackOverflow 文章/博客(例如来自 Francois Leblanc - https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html),我修改了所有参数以找到执行时间最快的最有效组合(这在您编写大型数据集时非常重要)报告日)。这是我发现的,这类似于很多关于pd.to_sql包括 Leblanc 的帖子:

import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
Run Code Online (Sandbox Code Playgroud)

使用以下非默认参数的任意组合会减慢我的执行时间to_sql(再次与 LeBlanc 在他的博客中提到的一致):

  1. chunksize=40 (40 是我可以根据 2098 SQL Server 参数限制为 52 列传递的最大值),
  2. method='multi',
  3. parallel=True)

注意:我意识到除了(或代替)传递之外chunksize=40,我还可以遍历我的 33 个 dask 数据帧分区并to_sql单独处理每个块。这将提高内存效率,也可能更快。一个分区需要 45 秒到 1 分钟,而所有分区一次完成整个 dask 数据帧需要 > 1 小时。我将尝试遍历所有分区并发布更新(如果速度更快)。一个小时似乎很多,但在尝试使用 Pandas 计算时,我感觉完全受阻,需要整夜或内存不足,所以这是一个 STEP UP。老实说,我对此很满意,现在可能要构建一个 .exepyinstaller 并让 .exe 每天运行,以便完全自动化并从那里开始,但我认为这对其他人有帮助,因为我在过去几周一直在努力解决各种解决方案。

Dav*_*son 1

我测试了通过循环将数据帧写入分区中的 SQL Server,而不是一次全部写入,完成所有内容的时间与一次性写入所有内容的时间相似。

import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
# From my question, I have replaced the commented out line of code with everything below that to see if there was a significant increase in speed. There was not. It was about the same as the cod in the question.
# ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
for i in range(ddf.npartitions):
    partition = ddf.get_partition(i)
    if i == 0:
        partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
    if i > 0:
        partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)