标签: dask

如何在不使用所有工作程序的情况下限制大量任务

想象一下,我有一个拥有10名工人和40个核心总数的dask网格.这是一个共享网格,所以我不想用我的工作完全浸透它.我有1000个任务要做,我想一次提交(并且主动运行)最多20个任务.

具体来说,

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>
Run Code Online (Sandbox Code Playgroud)

如果我设置了一个队列系统

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)
Run Code Online (Sandbox Code Playgroud)

这将工作,但是,这将把我的所有任务转储到网格,使其饱和.理想情况下我可以:

e.scatter(input_q, max_submit=20)
Run Code Online (Sandbox Code Playgroud)

看来这里的文档示例允许我使用maxsize …

python dask

10
推荐指数
1
解决办法
551
查看次数

我可以在Dask/Distributed中使用从.py文件导入的函数吗?

我有关于序列化和导入的问题.

  • 功能应该有自己的进口吗?就像我见过的PySpark一样
  • 以下是完全错误的吗?是否mod.py需要一个畅达/ PIP封装?mod.py被写入共享文件系统.

In [1]: from distributed import Executor

In [2]: e = Executor('127.0.0.1:8786')

In [3]: e
Out[3]: <Executor: scheduler="127.0.0.1:8786" processes=2 cores=2>

In [4]: import socket

In [5]: e.run(socket.gethostname)
Out[5]: {'172.20.12.7:53405': 'n1015', '172.20.12.8:53779': 'n1016'}

In [6]: %%file mod.py
   ...: def hostname():
   ...:     return 'the hostname'
   ...: 
Overwriting mod.py

In [7]: import mod

In [8]: mod.hostname()
Out[8]: 'the hostname'

In [9]: e.run(mod.hostname)
distributed.utils - ERROR - No module named 'mod'
Run Code Online (Sandbox Code Playgroud)

python distributed-computing dask

10
推荐指数
1
解决办法
2645
查看次数

如何指定默认dask调度程序的线程/进程数

有没有办法限制默认线程调度程序使用的内核数(使用dask数据帧时默认)?

使用compute,您可以使用以下命令指定它:

df.compute(get=dask.threaded.get, num_workers=20)
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有办法将其设置为默认值,因此您不需要为每次compute调用指定此项?

例如,在小型集群(例如64个核心)的情况下会很有趣,但是与其他人共享(没有作业系统),并且我不希望在使用dask开始计算时占用所有核心.

python dask

10
推荐指数
1
解决办法
3397
查看次数

我应该如何获得dask数据帧的形状?

执行.shape会给我以下错误.

AttributeError:'DataFrame'对象没有属性'shape'

我应该如何获得形状呢?

python dask

10
推荐指数
3
解决办法
5256
查看次数

在dask和pandas数据框中嵌套Numpy数组

适用于图像和音频的机器/深度学习代码的常见用例是加载和操纵图像或音频片段的大型数据集。这些数据集中的条目几乎总是由图像/音频段和元数据(例如,班级标签,培训/测试实例等)表示。

例如,在我的语音识别的特定用例中,数据集几乎总是由具有以下属性的条目组成:

  • 演讲者ID(字符串)
  • 成绩单(字符串)
  • 测试数据(布尔)
  • WAV数据(numpy数组)
  • 数据集名称(字符串)
  • ...

建议以大熊猫和/或dask表示这种数据集的推荐方法是-强调wav数据(在图像数据集中,这就是图像数据本身)?

在熊猫中,有一些技巧,可以将一列numpy数组嵌套在一个列中,但这不能很好地序列化,也不能使用dask。似乎这是一个非常普通的用例,但我找不到任何相关建议。

人们还可以将这些数组序列化/反序列化为二进制格式(Uber的petastorm做类似的事情),但这似乎错过了dask和pandas这样的库的要点,在库中自动幻化是核心优点之一。

我们欢迎任何实用的评论或对不同方法的建议。

python numpy pandas dask

10
推荐指数
1
解决办法
343
查看次数

将 Python 函数应用于 Pandas 分组数据帧 - 加速计算的最有效方法是什么?

我正在处理相当大的 Pandas DataFrame - 我的数据集类似于以下df设置:

import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing pandas apache-spark dask

10
推荐指数
1
解决办法
680
查看次数

dask:client.persist和client.compute之间的区别

我感到困惑的区别是什么之间client.persist()client.compute()双方似乎(在某些情况下),开始我的计算,都返回异步对象,但不是在我的简单的例子:

在这个例子中

from dask.distributed import Client
from dask import delayed
client = Client()

def f(*args):
    return args

result = [delayed(f(x)) for x in range(1000)]

x1 = client.compute(result)
x2 = client.persist(result)
Run Code Online (Sandbox Code Playgroud)

这里x1x2它们不同,但是在一个不那么简单的计算中,result也是一个Delayed对象列表,使用client.persist(result)开始计算就像client.compute(result)那样.

python dask

9
推荐指数
1
解决办法
5348
查看次数

Dask数据帧中npartition的作用是什么?

npartitions在许多功能中都看到了参数,但是我不明白它的用途/用途。

http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.read_csv

头(...)

元素仅取自前n个分区,默认值为1。如果前n个分区中的行少于n,则将引发警告,并返回所有找到的行。传递-1以使用所有分区。

重新分区(...)

输出的分区数必须小于输入的npartitions。仅在未指定除法的情况下使用。

在这种情况下,分区数是否约为5:

(图片来源:http : //dask.pydata.org/en/latest/dataframe-overview.html

python dataframe dask

9
推荐指数
1
解决办法
3000
查看次数

Dask中KilledWorker异常是什么意思?

KilledWorker将Dask与dask.distributed调度程序一起使用时,我的任务将返回异常.这些错误意味着什么?

dask

9
推荐指数
1
解决办法
2547
查看次数

Dask连接的简单方法(水平,轴= 1,列)

操作将 两个csv(data.csv和label.csv)读取到单个数据帧.

df = dd.read_csv(data_files, delimiter=' ', header=None, names=['x', 'y', 'z', 'intensity', 'r', 'g', 'b'])
df_label = dd.read_csv(label_files, delimiter=' ', header=None, names=['label'])
Run Code Online (Sandbox Code Playgroud)

问题 列的连接需要已知的划分.但是,设置索引会对数据进行排序,这是我明确不想要的,因为两个文件的顺序都是匹配的.

df = dd.concat([df, df_label], axis=1)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-11-e6c2e1bdde55> in <module>()
----> 1 df = dd.concat([df, df_label], axis=1)

/uhome/hemmest/.local/lib/python3.5/site-packages/dask/dataframe/multi.py in concat(dfs, axis, join, interleave_partitions)
    573             return concat_unindexed_dataframes(dfs)
    574         else:
--> 575             raise ValueError('Unable to concatenate DataFrame with unknown '
    576                              'division specifying axis=1')
    577     else:

ValueError: Unable to concatenate DataFrame with unknown …
Run Code Online (Sandbox Code Playgroud)

python pandas dask

9
推荐指数
1
解决办法
4078
查看次数