import joblib
from sklearn.externals.joblib import parallel_backend
with joblib.parallel_backend('dask'):
from dask_ml.model_selection import GridSearchCV
import xgboost
from xgboost import XGBRegressor
grid_search = GridSearchCV(estimator= XGBRegressor(), param_grid = param_grid, cv = 3, n_jobs = -1)
grid_search.fit(df2,df3)
Run Code Online (Sandbox Code Playgroud)
我使用两台本地机器创建了一个 dask 集群
client = dask.distributed.client('tcp://191.xxx.xx.xxx:8786')
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用 dask gridsearchcv 找到最佳参数。我面临以下错误。
istributed.scheduler - ERROR - Couldn't gather keys {"('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)": ['tcp://127.0.0.1:3738']} state: ['processing'] workers: ['tcp://127.0.0.1:3738']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:3738'], ('xgbregressor-fit-score-7cb7087b3aff75a31f487cfe5a9cedb0', 1202, 2)
NoneType: None
distributed.client - WARNING - Couldn't gather …
Run Code Online (Sandbox Code Playgroud) 我可以像这样使用 dask-ml 估算平均值和最频繁的值,这很好用:
mean_imputer = impute.SimpleImputer(strategy='mean')
most_frequent_imputer = impute.SimpleImputer(strategy='most_frequent')
data = [[100, 2, 5], [np.nan, np.nan, np.nan], [70, 7, 5]]
df = pd.DataFrame(data, columns = ['Weight', 'Age', 'Height'])
df.iloc[:, [0,1]] = mean_imputer.fit_transform(df.iloc[:,[0,1]])
df.iloc[:, [2]] = most_frequent_imputer.fit_transform(df.iloc[:,[2]])
print(df)
Weight Age Height
0 100.0 2.0 5.0
1 85.0 4.5 5.0
2 70.0 7.0 5.0
Run Code Online (Sandbox Code Playgroud)
但是,如果我有 1 亿行数据,那么当 dask 可以只执行一个循环时,它似乎会执行两个循环,是否可以同时和/或并行而不是按顺序运行两个输入器?实现这一目标的示例代码是什么?
我正在一台具有 16GB RAM 的机器上运行下面粘贴的代码(故意)。
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np
from dask_ml.cluster import KMeans
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1, processes=False,
memory_limit='2GB', scheduler_port=0,
silence_logs=False, dashboard_address=8787)
n_centers = 12
n_features = 4
X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)
centers = np.zeros((n_centers, n_features))
for i in range(n_centers):
centers[i] = X_small[y_small == i].mean(0)
print(centers)
n_samples_per_block = 450 * 650 * 900
n_blocks = 4
delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
centers=centers,
n_features=n_features,
random_state=i)[0]
for i …
Run Code Online (Sandbox Code Playgroud)