Dask.dataframe:合并和groupby时内存不足

Tho*_*uoc 7 dask

我是Dask的新手并且遇到了一些麻烦.

我正在使用一台机器(4GB内存,2个内核)来分析两个csv文件(key.csv:约200万行,约300Mb,sig.csv:~1200万行,约600Mb).有了这些数据,大熊猫无法放入内存中,所以我切换到使用Dask.dataframe,我期望Dask会处理可以放入内存的小块内容(速度可能会慢一些,我不喜欢但是,只要它有效,我就不介意了,但是,不知怎的,Dask仍然耗尽了所有的记忆.

我的代码如下:

key=dd.read_csv("key.csv")
sig=dd.read_csv("sig.csv")

merge=dd.merge(key,sig,left_on["tag","name"],
    right_on["key_tag","query_name"],how="inner")
merge.to_csv("test2903_*.csv") 
# store results into  a hard disk since it cant be fit in memory
Run Code Online (Sandbox Code Playgroud)

我犯了什么错误?任何帮助表示赞赏.

Pow*_*ers 0

大的 CSV 文件通常不适合 Dask 等分布式计算引擎。在此示例中,CSV 为 600MB 和 300MB,这并不大。正如评论中所指定的,您可以设置blocksize在读取 CSV 时设置 ,以确保 CSV 被读入具有正确数量分区的 Dask DataFrame 中。

\n

当您可以在运行连接之前广播小型 DataFrame 时,分布式计算连接总是会运行得更快。您的机器有 4GB RAM,小型 DataFrame 为 300MB,因此它足够小,可以进行广播。Dask 自动广播 Pandas DataFrame。您可以将 Dask DataFrame 转换为 Pandas DataFramecompute()

\n

key是您示例中的小 DataFrame 。在广播之前对小 DataFrame 进行列修剪并使其变得更小效果更好。

\n
key=dd.read_csv("key.csv")\nsig=dd.read_csv("sig.csv", blocksize="100 MiB")\n\nkey_pdf = key.compute()\n  \nmerge=dd.merge(key_pdf, sig, left_on=["tag","name"],\n        right_on=["key_tag","query_name"], how="inner")\nmerge.to_csv("test2903_*.csv")\n
Run Code Online (Sandbox Code Playgroud)\n

这是一个 MVCE:

\n
import dask.dataframe as dd\nimport pandas as pd\n\ndf = pd.DataFrame(\n    {\n        "id": [1, 2, 3, 4],\n        "cities": ["Medell\xc3\xadn", "Rio", "Bogot\xc3\xa1", "Buenos Aires"],\n    }\n)\nlarge_ddf = dd.from_pandas(df, npartitions=2)\n\nsmall_df = pd.DataFrame(\n    {\n        "id": [1, 2, 3, 4],\n        "population": [2.6, 6.7, 7.2, 15.2],\n    }\n)\n\nmerged_ddf = dd.merge(\n    large_ddf,\n    small_df,\n    left_on=["id"],\n    right_on=["id"],\n    how="inner",\n)\n\nprint(merged_ddf.compute())\n\n   id        cities  population\n0   1      Medell\xc3\xadn         2.6\n1   2           Rio         6.7\n0   3        Bogot\xc3\xa1         7.2\n1   4  Buenos Aires        15.2\n
Run Code Online (Sandbox Code Playgroud)\n