dask分布式数据帧上的慢len函数

Jua*_*oMF 12 python performance dataframe dask

我一直在测试如何使用dask(具有20个内核的集群),我对调用len函数与切片通过loc的速度感到惊讶.

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()
Run Code Online (Sandbox Code Playgroud)

任何想法为什么会发生这种情况?len对我来说并不重要,但是我觉得通过不理解这种行为,我不了解图书馆.

在此输入图像描述

所有的绿色框都对应于"from_pandas",而在本文中,Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes调用图看起来更好(len_chunk被称为显着更快,呼叫似乎没有被锁定并等待一个工人完成他的任务,然后启动另一个)

在此输入图像描述

MRo*_*lin 13

好问题,关于数据何时向上移动到集群并返回到客户端(您的python会话),这有几个问题.让我们看看你的计算的几个阶段

使用Pandas加载数据

这是你的python会话中的Pandas数据帧,所以它显然仍然在你的本地进程中.

log = pd.read_csv('800000test', sep='\t')  # on client
Run Code Online (Sandbox Code Playgroud)

转换为惰性Dask.dataframe

这会将您的Pandas数据帧分解为20个Pandas数据帧,但这些仍然在客户端上.Dask数据帧不会急切地将数据发送到群集.

logd = dd.from_pandas(log,npartitions=20)  # still on client
Run Code Online (Sandbox Code Playgroud)

计算len

调用len实际上导致计算在这里(通常你会使用df.some_aggregation().compute().所以现在Dask启动.首先它将数据移出到集群(慢)然后它在所有20个分区(快速)上调用len,它聚合那些(快速)和然后将结果下移到您的客户端,以便它可以打印.

print(len(logd))  # costly roundtrip client -> cluster -> client
Run Code Online (Sandbox Code Playgroud)

分析

所以这里的问题是我们的dask.dataframe仍然在本地python会话中拥有它的所有数据.

比如使用本地线程调度程序而不是分布式调度程序要快得多.这应该以毫秒计算

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client
Run Code Online (Sandbox Code Playgroud)

但是大概你想知道如何扩展到更大的数据集,所以让我们以正确的方式做到这一点.

将数据加载到worker上

不要在客户端/本地会话中加载Pandas,而是让Dask worker加载csv文件的位.这样就不需要客户 - 工作人员通信.

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers
Run Code Online (Sandbox Code Playgroud)

然而,与懒惰不同pd.read_csv,dd.read_csv所以这应该立即返回.我们可以强制Dask实际使用persist方法进行计算

log = client.persist(log)  # triggers computation asynchronously
Run Code Online (Sandbox Code Playgroud)

现在,集群开始行动并直接在工作人员中加载数据.这个比较快.请注意,此方法会在后台工作时立即返回.如果您想等到它完成,请致电wait.

from dask.distributed import wait
wait(log)  # blocks until read is done
Run Code Online (Sandbox Code Playgroud)

如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改blocksize.

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks
Run Code Online (Sandbox Code Playgroud)

无论如何,log现在的操作应该很快

len(log)  # fast
Run Code Online (Sandbox Code Playgroud)

编辑

回答关于这篇博客文章的问题,这里有关于文件所在位置的假设.

通常,当您向其提供文件名时dd.read_csv,假定该文件对所有工作人员都可见.如果您使用的是网络文件系统或S3或HDFS等全局存储,则会出现这种情况.如果您使用的是网络文件系统,那么您将需要使用绝对路径(例如/path/to/myfile.*.csv),或者确保您的工作者和客户端具有相同的工作目录.

如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将必须加载并分散它.

简单但次优

简单的方法就是做你原来做的事情,但坚持你的dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers
Run Code Online (Sandbox Code Playgroud)

这很好,但导致通信略微不理想.

复杂但最佳

相反,您可以明确地将数据分散到群集中

[future] = client.scatter([log])
Run Code Online (Sandbox Code Playgroud)

这会进入更复杂的API,所以我只会指出你的文档

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/延迟collections.html