Dask 工作线程的内存清理

spi*_*ect 5 python dask dask-distributed

我正在多节点分布式 Dask 集群上运行多个并行任务。然而,一旦任务完成,worker 仍然持有大量内存,集群很快就会被填满。

client.restart()我在每个任务之后都尝试过client.cancel(df),第一个任务会杀死工作人员并发送到CancelledError其他正在运行的任务,这很麻烦,第二个任务没有多大帮助,因为我们在 Dask 的函数中使用了很多自定义对象和函数map。添加del已知变量gc.collect()也没有多大帮助。

我确信大部分内存占用是由于自定义 python 函数和使用client.map(..).

我的问题是:

  1. 有没有一种从命令行或其他方式类似的方法trigger worker restart if no tasks are running right now
  2. 如果不是,这个问题有哪些可能的解决方案?我不可能避免 Dask 任务中的自定义对象和纯 python 函数。

MRo*_*lin 2

如果没有对 future 的引用,那么 Dask 应该删除对您用它创建的 Python 对象的任何引用。有关如何调查此问题的更多信息,请参阅https://www.youtube.com/watch?v=MsnzpzFZAoQ 。

如果您的自定义 Python 代码确实存在一些内存泄漏,那么是的,您可以要求 Dask 工作人员定期重新启动。请参阅dask-worker --help手册页并查找以以下开头的关键字--lifetime