我正在多节点分布式 Dask 集群上运行多个并行任务。然而,一旦任务完成,worker 仍然持有大量内存,集群很快就会被填满。
client.restart()我在每个任务之后都尝试过client.cancel(df),第一个任务会杀死工作人员并发送到CancelledError其他正在运行的任务,这很麻烦,第二个任务没有多大帮助,因为我们在 Dask 的函数中使用了很多自定义对象和函数map。添加del已知变量gc.collect()也没有多大帮助。
我确信大部分内存占用是由于自定义 python 函数和使用client.map(..).
我的问题是:
trigger worker restart if no tasks are running right now?