因此,基本上我想要并行运行ML Pipelines。我一直在使用scikit-learn,因此决定使用DaskGridSearchCV。
我有一个gridSearchCV = DaskGridSearchCV(pipeline, grid, scoring=evaluator)对象列表,并按顺序运行它们:
for gridSearchCV in list:
gridSearchCV.fit(train_data, train_target)
predicted = gridSearchCV.predict(test_data)
Run Code Online (Sandbox Code Playgroud)
如果我有N个不同的GridSearch对象,我想尽可能多地利用所有可用资源。如果有资源可以同时并行运行2、3、4,...或N,我想这样做。
因此,我开始根据dask的文档尝试一些操作。首先,我尝试了一下dask.threaded,dask.multiprocessing但结果却变慢了,而且我不断:
/Library/Python/2.7/site-packages/sklearn/externals/joblib/parallel.py:540: UserWarning: Multiprocessing backed parallel loops cannot be nested below threads, setting n_jobs=1
这是代码片段:
def run_pipeline(self, gs, data):
train_data, test_data, train_target, expected = train_test_split(data, target, test_size=0.25, random_state=33)
model = gs.fit(train_data, train_target)
predicted = gs.predict(test_data)
values = [delayed(run_pipeline)(gs, df) for gs in gs_list]
compute(*values, get=dask.threaded.get)
Run Code Online (Sandbox Code Playgroud)
也许我走错路了,您对我有什么建议吗?
我正在尝试使用dask并行构建字典,但是遇到了TypeError: Delayed objects of unspecified length are not iterable。
我正在尝试计算add,subtract并且multiply同时使字典的构建速度更快。
这是代表我的用例的一些代码:
import dask
from dask.delayed import delayed
x1 = {'a': 1, 'b': 2, 'c': 3}
x2 = {'a': 4, 'b': 5, 'c': 6}
@delayed
def add(d1, d2):
z = {}
z['func1_a'] = d1['a'] + d2['a']
z['func1_b'] = d1['b'] + d2['b']
z['func1_c'] = d1['c'] + d2['c']
return z
@delayed
def subtract(d1, d2):
z = {}
z['func2_a'] = d1['a'] - d2['a']
z['func2_b'] = d1['b'] …Run Code Online (Sandbox Code Playgroud) 我正在努力将a dask.bag词典转换dask.delayed pandas.DataFrames成最终版dask.dataframe
我有一个函数(make_dict)可以将文件读入一个相当复杂的嵌套字典结构中,还有一个函数(make_df)可以将这些词典转换成一个字典pandas.DataFrame(每个文件的数据帧约为100 mb)。我想将所有数据框附加到一个中,dask.dataframe以进行进一步分析。
到目前为止,我一直在使用dask.delayed对象加载,转换和附加所有运行良好的数据(请参见下面的示例)。但是,为了将来的工作,我想将加载的字典存储在dask.bagusing中dask.persist()。
我设法将数据加载到其中dask.bag,从而生成了一系列字典或pandas.DataFrame调用后可以在本地使用的字典列表compute()。但是,当我尝试将dask.bag变成dask.dataframe使用to_delayed()时,遇到了错误(请参阅下文)。
感觉好像我在这里遗漏了一些简单的东西,或者也许我的方法dask.bag是错误的?
以下示例显示了我使用简化函数的方法,并引发了相同的错误。任何有关如何解决此问题的建议,我们深表感谢。
import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag
print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2
def make_dict(n=1):
return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}
def make_df(d):
return pd.DataFrame(d['data'])
k = [1,2,3]
# using dask.delayed
dfs = []
for n in k:
delayed_1 = …Run Code Online (Sandbox Code Playgroud) 我正在 dask 中构建一个非常大的 DAG 以提交给分布式调度程序,其中节点对本身可能非常大的数据帧进行操作。一种模式是我有大约 50-60 个函数来加载数据并构建每个数百 MB 的 Pandas 数据帧(并且在逻辑上代表单个表的分区)。我想将这些连接到图中下游节点的单个 dask 数据帧中,同时最小化数据移动。我将任务链接如下:
dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)
Run Code Online (Sandbox Code Playgroud)
在哪里
def pandas_to_dask(df):
return dask.dataframe.from_pandas(df).to_delayed()
Run Code Online (Sandbox Code Playgroud)
我尝试了各种concat_all实现,但这似乎是合理的:
def concat_all(dfs):
dfs = [dask.dataframe.from_delayed(df) for df in dfs]
return dask.dataframe.multi.concat(dfs, axis='index', join='inner')
Run Code Online (Sandbox Code Playgroud)
所有的熊猫数据帧在它们的索引上都是不相交的,并且是排序的/单调的。
但是,concat_all即使每个人的内存预算实际上相当大并且我不希望它移动数据,我也会因为这个功能而被杀死(集群管理器因为超过他们的内存预算而杀死他们)。我有理由肯定,compute()在使用 dask 数据帧的图形节点中调用之前,我总是将数据切片为合理的子集。
--memory-limit到目前为止,我正在玩没有成功。我至少正确地解决了这个问题吗?是否有我遗漏的考虑?
使用 Dask 文档中的确切代码: https://jobqueue.dask.org/en/latest/examples.html
如果页面发生变化,代码如下:
from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed
cluster = SLURMCluster(memory='8g',
processes=1,
cores=2,
extra=['--resources ssdGB=200,GPU=2'])
cluster.scale(2)
client = Client(cluster)
def step_1_w_single_GPU(data):
return "Step 1 done for: %s" % data
def step_2_w_local_IO(data):
return "Step 2 done for: %s" % data
stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]
result_stage_2 = client.compute(stage_2,
resources={tuple(stage_1): {'GPU': 1},
tuple(stage_2): {'ssdGB': 100}})
Run Code Online (Sandbox Code Playgroud)
这会导致这样的错误:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback …Run Code Online (Sandbox Code Playgroud) 我正在努力弄清楚如何让dask延迟工作在涉及创建字典的特定工作流程上.
这里的想法是func1,func2,func3可以同时独立运行,我希望这些函数的结果是新字典中的值z.
from dask.delayed import delayed
x1 = {'a': 1, 'b': 2, 'c': 3}
x2 = {'a': 4, 'b': 5, 'c': 6}
@delayed
def func1(d1, d2):
return d1['a'] + d2['a']
@delayed
def func2(d1, d2):
return d1['b'] - d2['b']
@delayed
def func3(d1, d2):
return d1['c'] * d2['c']
z = {}
z['val1'] = func1(x1, x2)
z['val2'] = func2(x1, x2)
z['val3'] = func3(x1, x2)
Run Code Online (Sandbox Code Playgroud)
当我运行以下操作时,出现错误:
>>> result_dict = z.compute()
AttributeError: 'dict' object has no attribute 'compute'
Run Code Online (Sandbox Code Playgroud)
当我运行以下命令时,它会成功,但结果是元组而不是字典.
>>> result_dict = dask.compute(z) …Run Code Online (Sandbox Code Playgroud) 使用Pycharm社区2018.1.4
Python 3.6
Dask 2.8.1
尝试在我的某些方法上实现 dask 延迟并收到错误
AttributeError: module 'dask' has no attribute 'delayed'.
Run Code Online (Sandbox Code Playgroud)
这显然不是真的,所以我想知道我做错了什么。我的实现结构如下:
AttributeError: module 'dask' has no attribute 'delayed'.
Run Code Online (Sandbox Code Playgroud)
本质上,我有一堆数据文件,它们是彼此独立的,所以我想并行运行它们,而不是通过 for 循环顺序运行,如果你取出 dask.delayed ,我就会这样做。
我是否从根本上错过了上述 dask 延迟实现中的任何内容?
我有一个需要根据某些条件进行过滤的项目列表。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几十万条记录)。
基本上,我需要做的是:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
Run Code Online (Sandbox Code Playgroud)
通过 Dask,我实现了以下目标:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False …Run Code Online (Sandbox Code Playgroud) 是否有一种更快的方法可以使用 Dask 仅检索大型已发布数组中的单个元素而不检索整个数组?
在下面的示例中,client.get_dataset('array1')[0] 与 client.get_dataset('array1') 花费的时间大致相同。
import distributed
client = distributed.Client()
data = [1]*10000000
payload = {'array1': data}
client.publish(**payload)
one_element = client.get_dataset('array1')[0]
Run Code Online (Sandbox Code Playgroud) 我正在尝试从一组压缩的 CSV 文件创建 dask 数据框。阅读问题,似乎 dask 需要使用 dask.distributedelasted()
import glob
import dask.dataframe as dd
import zipfile
import pandas as pd
from dask.delayed import delayed
#Create zip_dict with key-value pairs for .zip & .csv names
file_list = glob.glob('my_directory/zip_files/')
zip_dict = {}
for f in file_list:
key = f.split('/')[5][:-4]
zip_dict[key] = zipfile.ZipFile(f)
Run Code Online (Sandbox Code Playgroud)
zip_dict = {'log20160201': zipfile.ZipFile filename='/my_directory/zip_files/log20160201.zip' mode='r', 'log20160218': zipfile.ZipFile filename='/my_directory/zip_files/log20160218.zip' 的示例内容模式='r'}
# Create list of delayed pd.read_csv()
d_rows = []
for k, v in zip_dict.items():
row = delayed(pd.read_csv)(v.open(k+'.csv'),usecols=['time','cik'])
d_rows.append(row)
v.close()
Run Code Online (Sandbox Code Playgroud)
d_rows …
dask ×10
dask-delayed ×10
python ×6
dictionary ×3
bag ×1
pandas ×1
python-3.x ×1
scikit-learn ×1
zip ×1