Jua*_*oMF 12 python performance dataframe dask
我一直在测试如何使用dask(具有20个内核的集群),我对调用len函数与切片通过loc的速度感到惊讶.
import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')
log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)
#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)
print(len(logd))
#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()
任何想法为什么会发生这种情况?len对我来说并不重要,但是我觉得通过不理解这种行为,我不了解图书馆.
所有的绿色框都对应于"from_pandas",而在本文中,Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes调用图看起来更好(len_chunk被称为显着更快,呼叫似乎没有被锁定并等待一个工人完成他的任务,然后启动另一个)
MRo*_*lin 13
好问题,关于数据何时向上移动到集群并返回到客户端(您的python会话),这有几个问题.让我们看看你的计算的几个阶段
这是你的python会话中的Pandas数据帧,所以它显然仍然在你的本地进程中.
log = pd.read_csv('800000test', sep='\t')  # on client
这会将您的Pandas数据帧分解为20个Pandas数据帧,但这些仍然在客户端上.Dask数据帧不会急切地将数据发送到群集.
logd = dd.from_pandas(log,npartitions=20)  # still on client
调用len实际上导致计算在这里(通常你会使用df.some_aggregation().compute().所以现在Dask启动.首先它将数据移出到集群(慢)然后它在所有20个分区(快速)上调用len,它聚合那些(快速)和然后将结果下移到您的客户端,以便它可以打印.
print(len(logd))  # costly roundtrip client -> cluster -> client
所以这里的问题是我们的dask.dataframe仍然在本地python会话中拥有它的所有数据.
比如使用本地线程调度程序而不是分布式调度程序要快得多.这应该以毫秒计算
with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client
但是大概你想知道如何扩展到更大的数据集,所以让我们以正确的方式做到这一点.
不要在客户端/本地会话中加载Pandas,而是让Dask worker加载csv文件的位.这样就不需要客户 - 工作人员通信.
# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers
然而,与懒惰不同pd.read_csv,dd.read_csv所以这应该立即返回.我们可以强制Dask实际使用persist方法进行计算
log = client.persist(log)  # triggers computation asynchronously
现在,集群开始行动并直接在工作人员中加载数据.这个比较快.请注意,此方法会在后台工作时立即返回.如果您想等到它完成,请致电wait.
from dask.distributed import wait
wait(log)  # blocks until read is done
如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改blocksize.
log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks
无论如何,log现在的操作应该很快
len(log)  # fast
回答关于这篇博客文章的问题,这里有关于文件所在位置的假设.
通常,当您向其提供文件名时dd.read_csv,假定该文件对所有工作人员都可见.如果您使用的是网络文件系统或S3或HDFS等全局存储,则会出现这种情况.如果您使用的是网络文件系统,那么您将需要使用绝对路径(例如/path/to/myfile.*.csv),或者确保您的工作者和客户端具有相同的工作目录.
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将必须加载并分散它.
简单的方法就是做你原来做的事情,但坚持你的dask.dataframe
log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers
这很好,但导致通信略微不理想.
相反,您可以明确地将数据分散到群集中
[future] = client.scatter([log])
这会进入更复杂的API,所以我只会指出你的文档
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/延迟collections.html
| 归档时间: | 
 | 
| 查看次数: | 2548 次 | 
| 最近记录: |