使用map_partitions和pd.df.to_sql从dask数据框创建sql表

Lud*_*udo 7 python postgresql pandas dask pandas-to-sql

Dask没有像pandas这样的df.to_sql(),因此我正在尝试复制该功能并使用该map_partitions方法创建sql表。这是我的代码:

import dask.dataframe as dd
import pandas as pd
import sqlalchemy_utils as sqla_utils

db_url = 'my_db_url_connection'
conn = sqla.create_engine(db_url)

ddf = dd.read_csv('data/prod.csv')
meta=dict(ddf.dtypes)
ddf.map_partitions(lambda df: df.to_sql('table_name', db_url, if_exists='append',index=True), ddf, meta=meta)
Run Code Online (Sandbox Code Playgroud)

这将返回我的dask dataframe对象,但是当我查看我的psql服务器时,没有新表...这里出了什么问题?

UPDATE 仍然无法使其正常工作,但是由于独立问题。后续问题:重复的键值违反唯一约束-尝试从dask数据帧创建sql表时出现postgres错误

mdu*_*ant 5

简而言之,您已经创建了一个数据框,它是要完成的工作的规定,但尚未执行。要执行,您需要调用.compute()结果。

请注意,这里的输出并不是真正的数据帧,每个分区的求值结果None(因为to_sql没有输出),因此使用来表达它可能会更干净df.to_delayed一些,例如

dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
       for d in ddf.to_delayed()]
dask.compute(*out)
Run Code Online (Sandbox Code Playgroud)

还要注意,是否获得良好的并行性将取决于数据库驱动程序和数据系统本身。