想象一下,我有一个拥有10名工人和40个核心总数的dask网格.这是一个共享网格,所以我不想用我的工作完全浸透它.我有1000个任务要做,我想一次提交(并且主动运行)最多20个任务.
具体来说,
from time import sleep
from random import random
def inc(x):
from random import random
sleep(random() * 2)
return x + 1
def double(x):
from random import random
sleep(random())
return 2 * x
>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>
Run Code Online (Sandbox Code Playgroud)
如果我设置了一个队列系统
>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)
Run Code Online (Sandbox Code Playgroud)
这将工作,但是,这将把我的所有任务转储到网格,使其饱和.理想情况下我可以:
e.scatter(input_q, max_submit=20)
Run Code Online (Sandbox Code Playgroud)
看来这里的文档示例允许我使用maxsize …
我有关于序列化和导入的问题.
mod.py需要一个畅达/ PIP封装?mod.py被写入共享文件系统.In [1]: from distributed import Executor
In [2]: e = Executor('127.0.0.1:8786')
In [3]: e
Out[3]: <Executor: scheduler="127.0.0.1:8786" processes=2 cores=2>
In [4]: import socket
In [5]: e.run(socket.gethostname)
Out[5]: {'172.20.12.7:53405': 'n1015', '172.20.12.8:53779': 'n1016'}
In [6]: %%file mod.py
...: def hostname():
...: return 'the hostname'
...:
Overwriting mod.py
In [7]: import mod
In [8]: mod.hostname()
Out[8]: 'the hostname'
In [9]: e.run(mod.hostname)
distributed.utils - ERROR - No module named 'mod'
Run Code Online (Sandbox Code Playgroud) 有没有办法限制默认线程调度程序使用的内核数(使用dask数据帧时默认)?
使用compute,您可以使用以下命令指定它:
df.compute(get=dask.threaded.get, num_workers=20)
Run Code Online (Sandbox Code Playgroud)
但我想知道是否有办法将其设置为默认值,因此您不需要为每次compute调用指定此项?
例如,在小型集群(例如64个核心)的情况下会很有趣,但是与其他人共享(没有作业系统),并且我不希望在使用dask开始计算时占用所有核心.
执行.shape会给我以下错误.
AttributeError:'DataFrame'对象没有属性'shape'
我应该如何获得形状呢?
适用于图像和音频的机器/深度学习代码的常见用例是加载和操纵图像或音频片段的大型数据集。这些数据集中的条目几乎总是由图像/音频段和元数据(例如,班级标签,培训/测试实例等)表示。
例如,在我的语音识别的特定用例中,数据集几乎总是由具有以下属性的条目组成:
建议以大熊猫和/或dask表示这种数据集的推荐方法是-强调wav数据(在图像数据集中,这就是图像数据本身)?
在熊猫中,有一些技巧,可以将一列numpy数组嵌套在一个列中,但这不能很好地序列化,也不能使用dask。似乎这是一个非常普通的用例,但我找不到任何相关建议。
人们还可以将这些数组序列化/反序列化为二进制格式(Uber的petastorm做类似的事情),但这似乎错过了dask和pandas这样的库的要点,在库中自动幻化是核心优点之一。
我们欢迎任何实用的评论或对不同方法的建议。
我正在处理相当大的 Pandas DataFrame - 我的数据集类似于以下df设置:
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 …Run Code Online (Sandbox Code Playgroud) 我感到困惑的区别是什么之间client.persist()和client.compute()双方似乎(在某些情况下),开始我的计算,都返回异步对象,但不是在我的简单的例子:
在这个例子中
from dask.distributed import Client
from dask import delayed
client = Client()
def f(*args):
return args
result = [delayed(f(x)) for x in range(1000)]
x1 = client.compute(result)
x2 = client.persist(result)
Run Code Online (Sandbox Code Playgroud)
这里x1和x2它们不同,但是在一个不那么简单的计算中,result也是一个Delayed对象列表,使用client.persist(result)开始计算就像client.compute(result)那样.
我npartitions在许多功能中都看到了参数,但是我不明白它的用途/用途。
http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.read_csv
头(...)
元素仅取自前n个分区,默认值为1。如果前n个分区中的行少于n,则将引发警告,并返回所有找到的行。传递-1以使用所有分区。
重新分区(...)
输出的分区数必须小于输入的npartitions。仅在未指定除法的情况下使用。
在这种情况下,分区数是否约为5:
(图片来源:http : //dask.pydata.org/en/latest/dataframe-overview.html)
KilledWorker将Dask与dask.distributed调度程序一起使用时,我的任务将返回异常.这些错误意味着什么?
操作将 两个csv(data.csv和label.csv)读取到单个数据帧.
df = dd.read_csv(data_files, delimiter=' ', header=None, names=['x', 'y', 'z', 'intensity', 'r', 'g', 'b'])
df_label = dd.read_csv(label_files, delimiter=' ', header=None, names=['label'])
Run Code Online (Sandbox Code Playgroud)
问题 列的连接需要已知的划分.但是,设置索引会对数据进行排序,这是我明确不想要的,因为两个文件的顺序都是匹配的.
df = dd.concat([df, df_label], axis=1)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-11-e6c2e1bdde55> in <module>()
----> 1 df = dd.concat([df, df_label], axis=1)
/uhome/hemmest/.local/lib/python3.5/site-packages/dask/dataframe/multi.py in concat(dfs, axis, join, interleave_partitions)
573 return concat_unindexed_dataframes(dfs)
574 else:
--> 575 raise ValueError('Unable to concatenate DataFrame with unknown '
576 'division specifying axis=1')
577 else:
ValueError: Unable to concatenate DataFrame with unknown …Run Code Online (Sandbox Code Playgroud)