本地集群上加载 Dask 数据:“工作线程超出了 95% 的内存预算”。重新启动然后“KilledWorker”

LCh*_*eng 3 memory-management cluster-computing worker bigdata dask-dataframe

我知道以前也有人问过类似的问题,但他们的解决方案并不是很有帮助。我想最好的解决方案可能更具体于每个集群配置,因此我在此处提供有关我的集群和错误的更多详细信息。

import dask.dataframe as dd
import dask.bag as db
import json

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
Run Code Online (Sandbox Code Playgroud)

这是我的集群设置

cluster.scheduler
Run Code Online (Sandbox Code Playgroud)

#输出:

Scheduler: tcp://127.0.0.1:35367 workers: 8 cores: 48 tasks: 0



cluster.workers
Run Code Online (Sandbox Code Playgroud)

#输出:

{0: <Nanny: tcp://127.0.0.1:43789, threads: 6>,
 1: <Nanny: tcp://127.0.0.1:41375, threads: 6>,
 2: <Nanny: tcp://127.0.0.1:42577, threads: 6>,
 3: <Nanny: tcp://127.0.0.1:40171, threads: 6>,
 4: <Nanny: tcp://127.0.0.1:32867, threads: 6>,
 5: <Nanny: tcp://127.0.0.1:46529, threads: 6>,
 6: <Nanny: tcp://127.0.0.1:41535, threads: 6>,
 7: <Nanny: tcp://127.0.0.1:39645, threads: 6>}


client
Run Code Online (Sandbox Code Playgroud)

#输出

Client
Scheduler: tcp://127.0.0.1:35367
Dashboard: http://127.0.0.1:8787/status
Cluster
Workers: 8
Cores: 48
Memory: 251.64 GiB
Run Code Online (Sandbox Code Playgroud)

这是我的数据加载代码:

b = db.read_text('2019-12-16-latest-level.json').map(json.loads)

def flatten(record):
    return {
        'uuid': record['uuid'],
        'stored_at': record['stored_at'],
        'duration': record['duration']
}
Run Code Online (Sandbox Code Playgroud)

上面的所有代码都运行良好。这是遇到麻烦的一个:

df = b.map(flatten).to_dataframe()
df.head() 
Run Code Online (Sandbox Code Playgroud)

代码运行了大约 1 天,并给出了以下警告:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
Run Code Online (Sandbox Code Playgroud)

然后大约又一天,程序停止并给出了以下错误:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-10-84f98622da69> in <module>
      1 df = b.map(flatten).to_dataframe()
----> 2 df.head()
Run Code Online (Sandbox Code Playgroud)

这是错误报告的最后几行:

KilledWorker: ("('bag-from-delayed-file_to_blocks-list-loads-flatten-0daa9cba16c635566df6215c209f653c', 0)", <WorkerState 'tcp://127.0.0.1:41535', name: 6, memory: 0, processing: 1>)
Run Code Online (Sandbox Code Playgroud)

还附上了完整错误报告的屏幕截图: 截图1 截图2 截图3 截图4

关于如何处理这个问题有什么建议吗?谢谢。

the*_*den 8

我已经使用 dask 大约一个月了,结果好坏参半。我个人的信念是,该软件在执行任务图时在内存管理方面有某种致命的拥抱。dask 的典型操作方式是在短短几分钟内计算出大型计算的 95%,然后在接下来的 8 小时内处理最后 5%,在崩溃或耗尽计算预算之前似乎什么也不做。这非常令人沮丧。

也就是说,我使用更少的工作人员或将工作人员限制在进程而不是线程上取得了一些有限的成功。所以,在 16 核机器上我可能会这样做:

client = Client(processes=True, threads_per_worker=1)
Run Code Online (Sandbox Code Playgroud)

另一件重要的事情就是明智地坚持下去。持久化会导致给定时间图中的任务减少(从而导致内存中的任务减少)。因此,如果我想从 json 文件中读取包,我会在转换为数据帧之前保留该包,否则读取和转换都发生在compute()步骤中,我发现这是失败的原因。

然而,正如我所说,考虑到它表面上的所有功能,我发现 dask 非常令人失望。我改用 vaex 了。

抱歉我无法提供更多帮助。