gal*_*ine 5 dask dask-distributed
我正在使用 Dask 分配一些函数的计算。我的总体布局如下所示:
from dask.distributed import Client, LocalCluster, as_completed
cluster = LocalCluster(processes=config.use_dask_local_processes,
n_workers=1,
threads_per_worker=1,
)
client = Client(cluster)
cluster.scale(config.dask_local_worker_instances)
work_futures = []
# For each group do work
for group in groups:
fcast_futures.append(client.submit(_work, group))
# Wait till the work is done
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
Run Code Online (Sandbox Code Playgroud)
我的问题是,对于大量工作,我往往会达到内存限制。我看到很多:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 1.15 GB -- Worker memory limit: 1.43 GB
Run Code Online (Sandbox Code Playgroud)
似乎每个未来都没有释放它的记忆。我怎样才能触发它?我在 Python 2.7 上使用 dask==1.2.0。
只要客户端有未来指向,结果就会得到调度程序的帮助。当最后一个 future 被 python 垃圾回收时(或之后不久),内存被释放。在你的情况下,你在整个计算过程中将所有的未来都保存在一个列表中。您可以尝试修改您的循环:
for done_work in as_completed(fcast_futures, with_results=False):
try:
result = done_work.result()
except Exception as error:
log.exception(error)
done_work.release()
Run Code Online (Sandbox Code Playgroud)
或者将as_completed循环替换为在处理完 futures 后显式将其从列表中删除的内容。
| 归档时间: |
|
| 查看次数: |
5942 次 |
| 最近记录: |