与此问题类似,我遇到了Dask分布的内存问题.但是,在我的情况下,解释并不是客户端试图收集大量数据.
可以基于非常简单的任务图来说明该问题:delayed操作列表生成一些固定大小为~500 MB的随机DataFrame(以模拟从文件加载许多分区).任务图中的下一个操作是获取每个DataFrame的大小.最后,所有大小都减少到一个总大小,即必须返回给客户端的数据很小.
出于测试目的,我正在运行本地调度程序/工作程序单线程,限制为2GB内存,即:
$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000
Run Code Online (Sandbox Code Playgroud)
我对任务图的期望是工作者永远不需要超过500 MB的RAM,因为在"生成数据"之后直接运行"获取数据大小"应该立即使数据变小.但是,我观察到工作人员需要更多的内存:
因子2表示数据必须在内部复制.因此,任何使分区大小接近节点的物理内存的尝试都会导致MemoryErrors或大量交换.
任何有关这方面的信息都非常感谢.特别是:
memory-limit影响这种行为?从我的测试来看,使用较低的阈值似乎更早地触发GC(和/或溢出到磁盘?),但另一方面,还有其他内存峰值甚至超过使用更高阈值的峰值内存.请注意,我知道我可以通过在第一个操作中获取大小来解决这个特定问题,并且可能Dask的单机执行器更适合于该问题,但我要求教育目的.
附件1:测试代码
from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor
def simulate_df_partition_load(part_id):
"""
Creates a random DataFrame of ~500 MB
"""
num_rows = 5000000
num_cols = 13
df = …Run Code Online (Sandbox Code Playgroud) 在使用延迟转换我的程序时,我偶然发现了一种常用的编程模式,该模式不适用于延迟.例:
from dask import delayed
@delayed
def myFunction():
return 1,2
a, b = myFunction()
a.compute()
Run Code Online (Sandbox Code Playgroud)
提高:TypeError: Delayed objects of unspecified length are not iterable
虽然以下解决方法没有.但看起来更笨拙
from dask import delayed
@delayed
def myFunction():
return 1,2
dummy = myFunction()
a, b = dummy[0], dummy[1]
a.compute()
Run Code Online (Sandbox Code Playgroud)
这是预期的行为吗?
我想在 dask 中找到pandas.dataframe.sort_value函数的替代方法。
我是通过set_index 来的,但它会按单列排序。
如何对 Dask 数据框的多列进行排序?
我有一个超过35000行的熊猫系列。我想使用dask使其更有效率。但是,我的dask代码和pandas代码同时使用。最初,“ ser”是熊猫系列,而fun1和fun2是在系列的各个行中执行模式匹配的基本功能。
大熊猫
ser = ser.apply(fun1).apply(fun2)
达斯克
ser = dd.from_pandas(ser, npartitions = 16)
ser = ser.apply(fun1).apply(fun2)
在检查cpu核心的状态时,我发现并不是所有的核心都被使用了。只有一个内核已经习惯了100%。
有没有什么方法可以使用dask加快序列代码的速度,或者在串行执行dask操作时利用cpu的所有核心?
从文档来看,Number of allowed automatic retries if computing a result fails.
“结果”是指每个单独的任务还是整个compute()调用?
如果是指整个调用,那么dask.delayed中如何实现每个任务的重试呢?
另外,我不确定重试是否有效,如下面的代码所示。
import dask
import random
@dask.delayed
def add(x, y):
return x + y
@dask.delayed
def divide(sum_i):
n = random.randint(0, 1)
result = sum_i / n
return result
tasks = []
for i in range(3):
sum_i = add(i, i+1)
divide_n = divide(sum_i)
tasks.append(divide_n)
dask.compute(*tasks, retries=1000)
Run Code Online (Sandbox Code Playgroud)
预期输出为 (1, 3, 5),实际输出为 ZeroDivisionError。
我试图找到使用for循环并延迟dask的正确语法。我发现了一些教程和其他问题,但都不适合我的情况,这是非常基础的。
首先,这是并行运行for循环的正确方法吗?
%%time
list_names=['a','b','c','d']
keep_return=[]
@delayed
def loop_dummy(target):
for i in range (1000000000):
pass
print('passed value is:'+target)
return(1)
for i in list_names:
c=loop_dummy(i)
keep_return.append(c)
total = delayed(sum)(keep_return)
total.compute()
Run Code Online (Sandbox Code Playgroud)
这产生了
passed value is:a
passed value is:b
passed value is:c
passed value is:d
Wall time: 1min 53s
Run Code Online (Sandbox Code Playgroud)
如果我连续运行
%%time
list_names=['a','b','c','d']
keep_return=[]
def loop_dummy(target):
for i in range (1000000000):
pass
print('passed value is:'+target)
return(1)
for i in list_names:
c=loop_dummy(i)
keep_return.append(c)
Run Code Online (Sandbox Code Playgroud)
它实际上更快。
passed value is:a
passed value is:b
passed value is:c
passed value is:d …Run Code Online (Sandbox Code Playgroud) 我试图用来dask.delayed计算一个大矩阵以供以后计算使用。我只在一台本地机器上运行代码。当我使用dask单机调度程序时,它工作正常,但有点慢。要访问更多选项和性能监视器以改进我想dask.distributed在单台机器上使用的代码。然而,在dask.distributed客户端上运行相同的代码会慢慢耗尽所有可用内存并崩溃而没有任何实现。
是否有不同的设置问题的方法可以让dask.distributed客户端以更好的内存效率完成?
stack.rechunk优化块大小,包括按行和列自动分块(行块在dask调度程序中似乎运行得更快)。compute()和persist()(相同的结果)。dask.distributed使用线程和进程调度程序启动客户端并调整工作人员的数量。threads在死亡之前更快地使用更多的 RAM。dask.distributed内存限制,但忽略了内存限制。cluster = distributed.LocalCluster(memory_limit = 8e9)nX及nY以下),dask.distributed客户端确实完成了任务,但是它仍然比dask调度程序需要更多的时间和内存。这个例子重现了这个问题:
import dask
import distributed
import numpy as np
import dask.array as da …Run Code Online (Sandbox Code Playgroud) [mapr@impetus-i0057 latest_code_deepak]$ dask-worker 172.26.32.37:8786
distributed.nanny - INFO - Start Nanny at: 'tcp://172.26.32.36:50930'
distributed.diskutils - WARNING - Found stale lock file and directory '/home/mapr/latest_code_deepak/dask-worker-space/worker-PwEseH', purging
distributed.worker - INFO - Start worker at: tcp://172.26.32.36:41694
distributed.worker - INFO - Listening to: tcp://172.26.32.36:41694
distributed.worker - INFO - bokeh at: 172.26.32.36:8789
distributed.worker - INFO - nanny at: 172.26.32.36:50930
distributed.worker - INFO - Waiting to connect to: tcp://172.26.32.37:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 8
distributed.worker - INFO - Memory: 33.52 GB …Run Code Online (Sandbox Code Playgroud) 我喜欢 dask 的简单性,并且喜欢用它来抓取当地的超市。我的 multiprocessing.cpu_count() 是 4,但这段代码仅实现了 2 倍的加速。为什么?
from bs4 import BeautifulSoup
import dask, requests, time
import pandas as pd
base_url = 'https://www.lider.cl/supermercado/category/Despensa/?No={}&isNavRequest=Yes&Nrpp=40&page={}'
def scrape(id):
page = id+1; start = 40*page
bs = BeautifulSoup(requests.get(base_url.format(start,page)).text,'lxml')
prods = [prod.text for prod in bs.find_all('span',attrs={'class':'product-description js-ellipsis'})]
prods = [prod.text for prod in prods]
brands = [b.text for b in bs.find_all('span',attrs={'class':'product-name'})]
sdf = pd.DataFrame({'product': prods, 'brand': brands})
return sdf
data = [dask.delayed(scrape)(id) for id in range(10)]
df = dask.delayed(pd.concat)(data)
df = df.compute()
Run Code Online (Sandbox Code Playgroud) 将函数应用于 Dask 数组的每一列的最有效方法是什么?如下所述,我已经尝试了很多方法,但我仍然怀疑我对 Dask 的使用相当业余。
\n我有一个相当宽且相当长的数组,大小约为 3,000,000 x 10,000。我想将 ecdf 函数应用于该数组的每一列。堆叠在一起的各个列结果应生成与输入数组具有相同维度的数组。
\n考虑以下测试,让我知道哪种方法是理想的方法或者我可以如何改进。我知道,我可以只使用最快的,但我真的想最大限度地利用 Dask 的可能性。阵列也可以大数倍。与此同时,我的基准测试结果令我感到惊讶。也许我没有正确理解 Dask 背后的逻辑。
\nimport numpy as np\nimport dask\nimport dask.array as da\nfrom dask.distributed import Client, LocalCluster\nfrom statsmodels.distributions.empirical_distribution import ECDF\n\n### functions\ndef ecdf(x):\n fn = ECDF(x)\n return fn(x)\n\ndef ecdf_array(X):\n\n res = np.zeros_like(X)\n for i in range(X.shape[1]):\n res[:,i] = ecdf(X[:,i])\n \n return res\n\n### set up scheduler / workers\nn_workers = 10\ncluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)\nclient = Client(cluster)\n\n### create data set \nX = da.random.random((100000,100)) #dask\nXarr = X.compute() #numpy\n\n### traditional for loop\n%timeit …Run Code Online (Sandbox Code Playgroud)