blu*_*e10 9 python dask dask-delayed
与此问题类似,我遇到了Dask分布的内存问题.但是,在我的情况下,解释并不是客户端试图收集大量数据.
可以基于非常简单的任务图来说明该问题:delayed操作列表生成一些固定大小为~500 MB的随机DataFrame(以模拟从文件加载许多分区).任务图中的下一个操作是获取每个DataFrame的大小.最后,所有大小都减少到一个总大小,即必须返回给客户端的数据很小.
出于测试目的,我正在运行本地调度程序/工作程序单线程,限制为2GB内存,即:
$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000
Run Code Online (Sandbox Code Playgroud)
我对任务图的期望是工作者永远不需要超过500 MB的RAM,因为在"生成数据"之后直接运行"获取数据大小"应该立即使数据变小.但是,我观察到工作人员需要更多的内存:
因子2表示数据必须在内部复制.因此,任何使分区大小接近节点的物理内存的尝试都会导致MemoryErrors或大量交换.
任何有关这方面的信息都非常感谢.特别是:
memory-limit影响这种行为?从我的测试来看,使用较低的阈值似乎更早地触发GC(和/或溢出到磁盘?),但另一方面,还有其他内存峰值甚至超过使用更高阈值的峰值内存.请注意,我知道我可以通过在第一个操作中获取大小来解决这个特定问题,并且可能Dask的单机执行器更适合于该问题,但我要求教育目的.
附件1:测试代码
from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor
def simulate_df_partition_load(part_id):
"""
Creates a random DataFrame of ~500 MB
"""
num_rows = 5000000
num_cols = 13
df = pd.DataFrame()
for i in xrange(num_cols):
data_col = np.random.uniform(0, 1, num_rows)
df["col_{}".format(i)] = data_col
del data_col # for max GC-friendliness
print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
part_id, df.shape[0], df.shape[1],
df.memory_usage().sum() / (2 ** 20)
))
return df
e = Executor('127.0.0.1:8786', set_as_default=True)
num_partitions = 2
lazy_dataframes = [
delayed(simulate_df_partition_load)(part_id)
for part_id in xrange(num_partitions)
]
length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)
length_total = dag.compute()
Run Code Online (Sandbox Code Playgroud)
附件2: DAG插图
这里有几个问题:
工作人员可能在进入第一个计算大小任务之前运行两个创建数据任务.这是因为调度程序将所有当前可运行的任务分配给工作程序,可能比一次运行的任务多.工作人员完成第一个并报告给调度程序.当调度程序确定要向工作程序发送的新任务(计算大小任务)时,工作程序立即启动另一个创建数据任务.
是.
工作人员将开始将最近最少使用的数据元素写入磁盘.当您默认使用大约60%的内存时(通过__sizeof__协议测量),它会执行此操作.
注意:谢谢你提出的问题