Dav*_*son 5 sql-server sqlalchemy pandas dask dask-to-sql
我的最终目标是将 SQL/Python 一起用于一个有太多数据供熊猫处理的项目(至少在我的机器上)。所以,我去了dask
:
对于 #1 和 #2,它们需要大约 30 秒才能使用最少的内存来执行(几个 SQL 查询大约 200 行代码使用 dask 操作大型数据集)。又快又好玩!!!
但是,上面的#3 一直是主要的瓶颈。在(1. 内存和 2. 速度(执行时间))方面,使用 dask 或其他替代方法完成 #3 的一些有效方法是什么?查看更多背景,以及我尝试过的内容和得出的一些结论。
对于上面的#1、#2 和#3,由于内存限制/执行时间长,这是我发现用熊猫无法完成的任务,但是dask
用出色的颜色解决了上面的#1 和#2,但我仍然与#3 苦苦挣扎——以自动方式将数据返回到 SQL 表中,我没有发送到 .csv 然后导入到 SQL Server。我试图.compute()
将 dask 数据帧转换为 Pandas 数据帧,然后写入to_sql
,但是这种方式违背了使用 dask 读取/数据模型的目的,并且再次耗尽内存/无论如何都要永远执行。
因此,新计划是to_csv
每天生成一个新的 .csv 并使用查询将数据批量插入到表中。我认为这仍然是一个可行的解决方案;但是,今天,我很高兴地发现 dask 发布了一个新to_sql
功能(https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql)。利用有关此主题的现有 StackOverflow 文章/博客(例如来自 Francois Leblanc - https://leblancfg.com/benchmarks_writing_pandas_dataframe_SQL_Server.html),我修改了所有参数以找到执行时间最快的最有效组合(这在您编写大型数据集时非常重要)报告日)。这是我发现的,这类似于很多关于pd.to_sql
包括 Leblanc 的帖子:
import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
Run Code Online (Sandbox Code Playgroud)
使用以下非默认参数的任意组合会减慢我的执行时间to_sql
(再次与 LeBlanc 在他的博客中提到的一致):
chunksize=40
(40 是我可以根据 2098 SQL Server 参数限制为 52 列传递的最大值),method='multi'
,parallel=True
)注意:我意识到除了(或代替)传递之外chunksize=40
,我还可以遍历我的 33 个 dask 数据帧分区并to_sql
单独处理每个块。这将提高内存效率,也可能更快。一个分区需要 45 秒到 1 分钟,而所有分区一次完成整个 dask 数据帧需要 > 1 小时。我将尝试遍历所有分区并发布更新(如果速度更快)。一个小时似乎很多,但在尝试使用 Pandas 计算时,我感觉完全受阻,需要整夜或内存不足,所以这是一个 STEP UP。老实说,我对此很满意,现在可能要构建一个 .exepyinstaller
并让 .exe 每天运行,以便完全自动化并从那里开始,但我认为这对其他人有帮助,因为我在过去几周一直在努力解决各种解决方案。
我测试了通过循环将数据帧写入分区中的 SQL Server,而不是一次全部写入,完成所有内容的时间与一次性写入所有内容的时间相似。
import sqlalchemy as sa
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()
#windows authentication + fast_executemany=True
to_sql_uri = sa.create_engine(f'mssql://@{server}/{database}?trusted_connection=yes&driver={driver_name}', fast_executemany=True)
# From my question, I have replaced the commented out line of code with everything below that to see if there was a significant increase in speed. There was not. It was about the same as the cod in the question.
# ddf.to_sql('PowerBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
for i in range(ddf.npartitions):
partition = ddf.get_partition(i)
if i == 0:
partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='replace', index=False)
if i > 0:
partition.to_sql('CDR_PBI_Report', uri=to_sql_uri, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2126 次 |
最近记录: |