我是Dask的新手并且遇到了一些麻烦.
我正在使用一台机器(4GB内存,2个内核)来分析两个csv文件(key.csv:约200万行,约300Mb,sig.csv:~1200万行,约600Mb).有了这些数据,大熊猫无法放入内存中,所以我切换到使用Dask.dataframe,我期望Dask会处理可以放入内存的小块内容(速度可能会慢一些,我不喜欢但是,只要它有效,我就不介意了,但是,不知怎的,Dask仍然耗尽了所有的记忆.
我的代码如下:
key=dd.read_csv("key.csv")
sig=dd.read_csv("sig.csv")
merge=dd.merge(key,sig,left_on["tag","name"],
right_on["key_tag","query_name"],how="inner")
merge.to_csv("test2903_*.csv")
# store results into a hard disk since it cant be fit in memory
Run Code Online (Sandbox Code Playgroud)
我犯了什么错误?任何帮助表示赞赏.
大的 CSV 文件通常不适合 Dask 等分布式计算引擎。在此示例中,CSV 为 600MB 和 300MB,这并不大。正如评论中所指定的,您可以设置blocksize在读取 CSV 时设置 ,以确保 CSV 被读入具有正确数量分区的 Dask DataFrame 中。
当您可以在运行连接之前广播小型 DataFrame 时,分布式计算连接总是会运行得更快。您的机器有 4GB RAM,小型 DataFrame 为 300MB,因此它足够小,可以进行广播。Dask 自动广播 Pandas DataFrame。您可以将 Dask DataFrame 转换为 Pandas DataFramecompute()。
key是您示例中的小 DataFrame 。在广播之前对小 DataFrame 进行列修剪并使其变得更小效果更好。
key=dd.read_csv("key.csv")\nsig=dd.read_csv("sig.csv", blocksize="100 MiB")\n\nkey_pdf = key.compute()\n \nmerge=dd.merge(key_pdf, sig, left_on=["tag","name"],\n right_on=["key_tag","query_name"], how="inner")\nmerge.to_csv("test2903_*.csv")\nRun Code Online (Sandbox Code Playgroud)\n这是一个 MVCE:
\nimport dask.dataframe as dd\nimport pandas as pd\n\ndf = pd.DataFrame(\n {\n "id": [1, 2, 3, 4],\n "cities": ["Medell\xc3\xadn", "Rio", "Bogot\xc3\xa1", "Buenos Aires"],\n }\n)\nlarge_ddf = dd.from_pandas(df, npartitions=2)\n\nsmall_df = pd.DataFrame(\n {\n "id": [1, 2, 3, 4],\n "population": [2.6, 6.7, 7.2, 15.2],\n }\n)\n\nmerged_ddf = dd.merge(\n large_ddf,\n small_df,\n left_on=["id"],\n right_on=["id"],\n how="inner",\n)\n\nprint(merged_ddf.compute())\n\n id cities population\n0 1 Medell\xc3\xadn 2.6\n1 2 Rio 6.7\n0 3 Bogot\xc3\xa1 7.2\n1 4 Buenos Aires 15.2\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
635 次 |
| 最近记录: |