标签: dask-delayed

了解Dask分布式的内存行为

此问题类似,我遇到了Dask分布的内存问题.但是,在我的情况下,解释并不是客户端试图收集大量数据.

可以基于非常简单的任务图来说明该问题:delayed操作列表生成一些固定大小为~500 MB的随机DataFrame(以模拟从文件加载许多分区).任务图中的下一个操作是获取每个DataFrame的大小.最后,所有大小都减少到一个总大小,即必须返回给客户端的数据很小.

出于测试目的,我正在运行本地调度程序/工作程序单线程,限制为2GB内存,即:

$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000
Run Code Online (Sandbox Code Playgroud)

我对任务图的期望是工作者永远不需要超过500 MB的RAM,因为在"生成数据"之后直接运行"获取数据大小"应该立即使数据变小.但是,我观察到工作人员需要更多的内存:

内存使用情况

因子2表示数据必须在内部复制.因此,任何使分区大小接近节点的物理内存的尝试都会导致MemoryErrors或大量交换.

任何有关这方面的信息都非常感谢.特别是:

  • 我是否可以控制数据的重复,是否可以避免这种情况?或者一般的经验法则是将有效载荷保持在50%以下以解决数据重复问题?
  • 工人如何memory-limit影响这种行为?从我的测试来看,使用较低的阈值似乎更早地触发GC(和/或溢出到磁盘?),但另一方面,还有其他内存峰值甚至超过使用更高阈值的峰值内存.

请注意,我知道我可以通过第一个操作中获取大小来解决这个特定问题,并且可能Dask的单机执行器更适合于该问题,但我要求教育目的.


附件1:测试代码

from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor


def simulate_df_partition_load(part_id):
    """
    Creates a random DataFrame of ~500 MB
    """
    num_rows = 5000000
    num_cols = 13

    df = …
Run Code Online (Sandbox Code Playgroud)

python dask dask-delayed

9
推荐指数
1
解决办法
2037
查看次数

解包延迟功能的结果

在使用延迟转换我的程序时,我偶然发现了一种常用的编程模式,该模式不适用于延迟.例:

from dask import delayed
@delayed
def myFunction():
    return 1,2

a, b = myFunction()
a.compute()
Run Code Online (Sandbox Code Playgroud)

提高:TypeError: Delayed objects of unspecified length are not iterable 虽然以下解决方法没有.但看起来更笨拙

from dask import delayed
@delayed
def myFunction():
    return 1,2

dummy = myFunction()
a, b = dummy[0], dummy[1]
a.compute()
Run Code Online (Sandbox Code Playgroud)

这是预期的行为吗?

python dask dask-delayed

8
推荐指数
1
解决办法
1200
查看次数

在 Dask 中排序

我想在 dask 中找到pandas.dataframe.sort_value函数的替代方法。
我是通过set_index 来的,但它会按单列排序。

如何对 Dask 数据框的多列进行排序?

sorting dask dask-delayed dask-distributed

8
推荐指数
1
解决办法
4462
查看次数

如何使用Dask使用所有cpu核心?

我有一个超过35000行的熊猫系列。我想使用dask使其更有效率。但是,我的dask代码和pandas代码同时使用。最初,“ ser”是熊猫系列,而fun1fun2是在系列的各个行中执行模式匹配的基本功能。
大熊猫
ser = ser.apply(fun1).apply(fun2)

达斯克
ser = dd.from_pandas(ser, npartitions = 16) ser = ser.apply(fun1).apply(fun2)

在检查cpu核心的状态时,我发现并不是所有的核心都被使用了。只有一个内核已经习惯了100%。

有没有什么方法可以使用dask加快序列代码的速度,或者在串行执行dask操作时利用cpu的所有核心?

dask dask-delayed dask-distributed

8
推荐指数
1
解决办法
4600
查看次数

dask.compute() 中的重试不清楚

从文档来看,Number of allowed automatic retries if computing a result fails.

“结果”是指每个单独的任务还是整个compute()调用?

如果是指整个调用,那么dask.delayed中如何实现每个任务的重试呢?

另外,我不确定重试是否有效,如下面的代码所示。

import dask
import random

@dask.delayed
def add(x, y):
    return x + y

@dask.delayed
def divide(sum_i):
    n = random.randint(0, 1)
    result = sum_i / n
    return result

tasks = []
for i in range(3):
    sum_i = add(i, i+1)
    divide_n = divide(sum_i)
    tasks.append(divide_n)

dask.compute(*tasks, retries=1000)
Run Code Online (Sandbox Code Playgroud)

预期输出为 (1, 3, 5),实际输出为 ZeroDivisionError。

dask dask-delayed

7
推荐指数
1
解决办法
862
查看次数

Dask For Loop平行

我试图找到使用for循环并延迟dask的正确语法。我发现了一些教程和其他问题,但都不适合我的情况,这是非常基础的。

首先,这是并行运行for循环的正确方法吗?

%%time

list_names=['a','b','c','d']
keep_return=[]

@delayed
def loop_dummy(target):
    for i in range (1000000000):
        pass
    print('passed value is:'+target)
    return(1)


for i in list_names:
    c=loop_dummy(i)
    keep_return.append(c)


total = delayed(sum)(keep_return)
total.compute()
Run Code Online (Sandbox Code Playgroud)

这产生了

passed value is:a
passed value is:b
passed value is:c
passed value is:d
Wall time: 1min 53s
Run Code Online (Sandbox Code Playgroud)

如果我连续运行

%%time

list_names=['a','b','c','d']
keep_return=[]


def loop_dummy(target):
    for i in range (1000000000):
        pass
    print('passed value is:'+target)
    return(1)


for i in list_names:
    c=loop_dummy(i)
    keep_return.append(c)
Run Code Online (Sandbox Code Playgroud)

它实际上更快。

passed value is:a
passed value is:b
passed value is:c
passed value is:d …
Run Code Online (Sandbox Code Playgroud)

dask dask-delayed

6
推荐指数
1
解决办法
2346
查看次数

dask 和 dask.distributed 之间的巨大内存使用差异

我试图用来dask.delayed计算一个大矩阵以供以后计算使用。我只在一台本地机器上运行代码。当我使用dask单机调度程序时,它工作正常,但有点慢。要访问更多选项和性能监视器以改进我想dask.distributed在单台机器上使用的代码。然而,在dask.distributed客户端上运行相同的代码会慢慢耗尽所有可用内存并崩溃而没有任何实现。

是否有不同的设置问题的方法可以让dask.distributed客户端以更好的内存效率完成?

  • 我通读了 dask.delayed最佳实践指南,并认为我们正在正确使用它。
  • 我已经在本地 Win 10 PC (64GB RAM) 和 Azure Win Server 2012 VM (256 GB) 上运行它,结果相同。
  • 我试过手动设置块。
  • 我尝试使用stack.rechunk优化块大小,包括按行和列自动分块(行块在dask调度程序中似乎运行得更快)。
  • 我试过使用compute()persist()(相同的结果)。
  • 我尝试dask.distributed使用线程和进程调度程序启动客户端并调整工作人员的数量。threads在死亡之前更快地使用更多的 RAM。
  • 我已尝试根据此答案设置dask.distributed内存限制,但忽略了内存限制。cluster = distributed.LocalCluster(memory_limit = 8e9)
  • 如果我减少问题的大小(nXnY以下),dask.distributed客户端确实完成了任务,但是它仍然比dask调度程序需要更多的时间和内存。

这个例子重现了这个问题:

import dask
import distributed
import numpy as np
import dask.array as da …
Run Code Online (Sandbox Code Playgroud)

python dask dask-delayed dask-distributed

6
推荐指数
0
解决办法
1492
查看次数

dask 工作人员存储结果或文件的默认目录是什么?

[mapr@impetus-i0057 latest_code_deepak]$ dask-worker 172.26.32.37:8786
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.26.32.36:50930'
distributed.diskutils - WARNING - Found stale lock file and directory '/home/mapr/latest_code_deepak/dask-worker-space/worker-PwEseH', purging
distributed.worker - INFO -       Start worker at:   tcp://172.26.32.36:41694
distributed.worker - INFO -          Listening to:   tcp://172.26.32.36:41694
distributed.worker - INFO -              bokeh at:          172.26.32.36:8789
distributed.worker - INFO -              nanny at:         172.26.32.36:50930
distributed.worker - INFO - Waiting to connect to:    tcp://172.26.32.37:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          8
distributed.worker - INFO -                Memory:                   33.52 GB …
Run Code Online (Sandbox Code Playgroud)

dask dask-delayed dask-distributed

5
推荐指数
1
解决办法
3430
查看次数

使用 dask 通过请求进行抓取

我喜欢 dask 的简单性,并且喜欢用它来抓取当地的超市。我的 multiprocessing.cpu_count() 是 4,但这段代码仅实现了 2 倍的加速。为什么?

from bs4 import BeautifulSoup
import dask, requests, time
import pandas as pd

base_url = 'https://www.lider.cl/supermercado/category/Despensa/?No={}&isNavRequest=Yes&Nrpp=40&page={}'

def scrape(id):
    page = id+1; start = 40*page
    bs = BeautifulSoup(requests.get(base_url.format(start,page)).text,'lxml')
    prods = [prod.text for prod in bs.find_all('span',attrs={'class':'product-description js-ellipsis'})]
    prods = [prod.text for prod in prods]
    brands = [b.text for b in bs.find_all('span',attrs={'class':'product-name'})]

    sdf = pd.DataFrame({'product': prods, 'brand': brands})
    return sdf

data = [dask.delayed(scrape)(id) for id in range(10)]
df = dask.delayed(pd.concat)(data)
df = df.compute()
Run Code Online (Sandbox Code Playgroud)

screen-scraping python-requests dask dask-delayed

5
推荐指数
1
解决办法
1530
查看次数

对 Dask 数组的列应用函数

将函数应用于 Dask 数组的每一列的最有效方法是什么?如下所述,我已经尝试了很多方法,但我仍然怀疑我对 Dask 的使用相当业余。

\n

我有一个相当宽且相当长的数组,大小约为 3,000,000 x 10,000。我想将 ecdf 函数应用于该数组的每一列。堆叠在一起的各个列结果应生成与输入数组具有相同维度的数组。

\n

考虑以下测试,让我知道哪种方法是理想的方法或者我可以如何改进。我知道,我可以只使用最快的,但我真的想最大限度地利用 Dask 的可能性。阵列也可以大数倍。与此同时,我的基准测试结果令我感到惊讶。也许我没有正确理解 Dask 背后的逻辑。

\n
import numpy as np\nimport dask\nimport dask.array as da\nfrom dask.distributed import Client, LocalCluster\nfrom statsmodels.distributions.empirical_distribution import ECDF\n\n### functions\ndef ecdf(x):\n    fn = ECDF(x)\n    return fn(x)\n\ndef ecdf_array(X):\n\n    res = np.zeros_like(X)\n    for i in range(X.shape[1]):\n        res[:,i] = ecdf(X[:,i])\n        \n    return res\n\n### set up scheduler / workers\nn_workers = 10\ncluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)\nclient = Client(cluster)\n\n### create data set \nX = da.random.random((100000,100)) #dask\nXarr = X.compute() #numpy\n\n### traditional for loop\n%timeit …
Run Code Online (Sandbox Code Playgroud)

python dask dask-delayed dask-distributed dask-dataframe

5
推荐指数
1
解决办法
682
查看次数