GridSearchCV 不适用于管道的内存参数和并发性(n_jobs > 1)

Dom*_*nik 6 python machine-learning scikit-learn

我使用管道作为 的估计器GridSearchCV。效果很好。但是,如果我使用内存参数启用缓存并将其设置n_jobs为大于 1,cv_results_则 \ 的分数列NaN将在一秒钟内完成,而不是几分钟。

\n

您是否无法使用缓存功能GridSearchCV或者我做错了什么?

\n
gsCV = GridSearchCV(\n    estimator=Pipeline(\n#         memory=\'../Cache/AW1MP_N10_DIN276_Pipeline\', # not working if enabled\n        steps=[\n            (\'we\', FastTextTransformer()), \n            (\'se\', AverageWordVectorTransformer()),\n            (\'rf\', RandomForestClassifier())\n        ]\n\n    ),\n    param_grid=[\n        {\n            \'we__min_count\': [5],\n            \'we__size\': [64],\n            \'we__window\': [5],\n            \'we__min_n\': [3],\n            \'we__max_n\': [6],\n            \'rf__n_estimators\': [1, 2, 3, 4, 5, 10],# 25, 64, 128], # number of trees in forest\n            \'rf__criterion\':[\'gini\'],#\'entropy\'], # split criterion\n            \'rf__max_features\':[\'auto\'], # number of features per tree,\n            \'rf__max_depth\':[4, 8, 16]#, 32, 64, 128]\n        }\n    ], \n    cv=CV,\n    verbose=VERBOSE,\n    n_jobs=N_JOBS,\n    return_train_score=True,\n    scoring=None\n)\n\ngsCV.fit(X_train, label_encoder.inverse_transform(Y_train).reshape(-1))\n
Run Code Online (Sandbox Code Playgroud)\n

不带内存参数的输出

\n
[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.\n[Parallel(n_jobs=6)]: Done   1 tasks      | elapsed:   28.9s\n[Parallel(n_jobs=6)]: Done   6 tasks      | elapsed:   29.4s\n[Parallel(n_jobs=6)]: Done  13 tasks      | elapsed:  1.5min\n[Parallel(n_jobs=6)]: Done  20 tasks      | elapsed:  2.0min\n[Parallel(n_jobs=6)]: Done  29 tasks      | elapsed:  2.5min\n[Parallel(n_jobs=6)]: Done  38 tasks      | elapsed:  3.5min\n[Parallel(n_jobs=6)]: Done  49 tasks      | elapsed:  4.5min\n[Parallel(n_jobs=6)]: Done  60 tasks      | elapsed:  5.1min\n[Parallel(n_jobs=6)]: Done  73 tasks      | elapsed:  6.6min\n[Parallel(n_jobs=6)]: Done  90 out of  90 | elapsed:  7.6min finished\n
Run Code Online (Sandbox Code Playgroud)\n

将内存参数设置为路径的输出

\n
[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.\n[Parallel(n_jobs=6)]: Done   1 tasks      | elapsed:    3.3s\n[Parallel(n_jobs=6)]: Done   6 tasks      | elapsed:    3.3s\n[Parallel(n_jobs=6)]: Done  13 tasks      | elapsed:    3.7s\n[Parallel(n_jobs=6)]: Done  20 tasks      | elapsed:    4.0s\n[Parallel(n_jobs=6)]: Done  29 tasks      | elapsed:    4.3s\n[Parallel(n_jobs=6)]: Done  38 tasks      | elapsed:    4.7s\n[Parallel(n_jobs=6)]: Done  49 tasks      | elapsed:    5.0s\n[Parallel(n_jobs=6)]: Done  60 tasks      | elapsed:    5.4s\n[Parallel(n_jobs=6)]: Done  73 tasks      | elapsed:    5.9s\n[Parallel(n_jobs=6)]: Done  90 out of  90 | elapsed:    6.4s finished\nC:\\Users\\username\\anaconda3\\envs\\SDaC\\lib\\site-packages\\sklearn\\pipeline.py:296: UserWarning: Persisting input arguments took 1.40s to run.\nIf this happens often in your code, it can cause performance problems \n(results will be correct in all cases). \nThe reason for this is probably some large input arguments for a wrapped\n function (e.g. large strings).\nTHIS IS A JOBLIB ISSUE. If you can, kindly provide the joblib\'s team with an\n example so that they can fix the problem.\n  **fit_params_steps[name])\n\nC:\\Users\\username\\anaconda3\\envs\\SDaC\\lib\\site-packages\\sklearn\\pipeline.py:296: UserWarning: Persisting input arguments took 5.32s to run.\nIf this happens often in your code, it can cause performance problems \n(results will be correct in all cases). \nThe reason for this is probably some large input arguments for a wrapped\n function (e.g. large strings).\nTHIS IS A JOBLIB ISSUE. If you can, kindly provide the joblib\'s team with an\n example so that they can fix the problem.\n  **fit_params_steps[name])\n
Run Code Online (Sandbox Code Playgroud)\n

输出与error_score=\'raise\'

\n
The above exception was the direct cause of the following exception:\n\nPicklingError                             Traceback (most recent call last)\n<ipython-input-247-f1d887547f42> in <module>\n     19 )\n     20 \n---> 21 gsCV_clf.fit(X_train, label_encoder.inverse_transform(Y_train).reshape(-1)) # use class because of Random Forest Classifier\n     22 print(\'hi\')\n\n~\\anaconda3\\envs\\SDaC\\lib\\site-packages\\sklearn\\utils\\validation.py in inner_f(*args, **kwargs)\n     70                           FutureWarning)\n     71         kwargs.update({k: arg for k, arg in zip(sig.parameters, args)})\n---> 72         return f(**kwargs)\n     73     return inner_f\n     74 \n\n~\\anaconda3\\envs\\SDaC\\lib\\site-packages\\sklearn\\model_selection\\_search.py in fit(self, X, y, groups, **fit_params)\n    734                 return results\n    735 \n--> 736             self._run_search(evaluate_candidates)\n    737 \n    738         # For multi-metric evaluation, store the best_index_, best_params_ and\n\n~\\anaconda3\\envs\\SDaC\\lib\\site-packages\\sklearn\\model_selection\\_search.py in _run_search(self, evaluate_candidates)\n   1186     def _run_search(self, evaluate_candidates):\n   1187         """Search all candidates in param_grid"""\n-> 1188         evaluate_candidates(ParameterGrid(self.param_grid))\n   1189 \n   1190 \n\n~\\anaconda3\\envs\\SDaC\\lib\\site-packages\\sklearn\\model_selection\\_search.py in evaluate_candidates(candidate_params)\n    713                                for parameters, (train, test)\n    714                                in product(candidate_params,\n--> 715                                           cv.split(X, y, groups)))\n    716 \n    717                 if len(out) < 1:\n\n~\\anaconda3\\envs\\SDaC\\lib\\site-packages\\joblib\\parallel.py in __call__(self, iterable)\n   1052 \n   1053             with self._backend.retrieval_context():\n-> 1054                 self.retrieve()\n   1055             # Make sure that we get a last message telling us we are done\n   1056             elapsed_time = time.time() - self._start_time\n\n~\\anaconda3\\envs\\SDaC\\lib\\site-packages\\joblib\\parallel.py in retrieve(self)\n    931             try:\n    932                 if getattr(self._backend, \'supports_timeout\', False):\n--> 933                     self._output.extend(job.get(timeout=self.timeout))\n    934                 else:\n    935                     self._output.extend(job.get())\n\n~\\anaconda3\\envs\\SDaC\\lib\\site-packages\\joblib\\_parallel_backends.py in wrap_future_result(future, timeout)\n    540         AsyncResults.get from multiprocessing."""\n    541         try:\n--> 542             return future.result(timeout=timeout)\n    543         except CfTimeoutError as e:\n    544             raise TimeoutError from e\n\n~\\anaconda3\\envs\\SDaC\\lib\\concurrent\\futures\\_base.py in result(self, timeout)\n    433                 raise CancelledError()\n    434             elif self._state == FINISHED:\n--> 435                 return self.__get_result()\n    436             else:\n    437                 raise TimeoutError()\n\n~\\anaconda3\\envs\\SDaC\\lib\\concurrent\\futures\\_base.py in __get_result(self)\n    382     def __get_result(self):\n    383         if self._exception:\n--> 384             raise self._exception\n    385         else:\n    386             return self._result\n\nPicklingError: ("Can\'t pickle <class \'__main__.CustomTokenizer\'>: it\'s not found as __main__.CustomTokenizer", \'PicklingError while hashing {\\\'transformer\\\': CustomTokenizer(), \\\'X\\\':       kostenposition_bau_nr_komplett  ...                                               text\\n12862                            326  ...                           Fenster Holzfenster AQ 1\\n17556                            326  ...       Scheiben verkratzt Holzfenster AQ 7, 8.1-8.2\\n11648                            314  ...  Boden am \xc3\x9cbergang zwischen Naturstein und Beto...\\n2344                             300  ...  Farbverschmutzung Decke (Lampe) Farbverschmutz...\\n13097                            326  ...  Sonnenschutz einstellen linkes Fenster klapper...\\n...                              ...  ...                                                ...\\n17213                            327  ...  105 K\xc3\xbcche Fuge Arbeitsplatte R\xc3\xbcckwand fehlt Ti...\\n4200                             300  ...  offene Hartverfugung Boden (Dusche) offene Har...\\n12443                            327  ...   Leichter Versatzder verkleidungsteile am Lich...\\n14023                            324  ...        Fuge mit L\xc3\xbccken Bad GU AQ 4, 5, 6, 8.1, 8.2\\n3635                             300  ...  reinigen Glashalteleiste (WC) reinigen Glashal...\\n\\n[9731 rows x 3 columns], \\\'y\\\': array([\\\'326\\\', \\\'326\\\', \\\'314\\\', ..., \\\'327\\\', \\\'324\\\', \\\'300\\\'], dtype=\\\'<U3\\\'), \\\'weight\\\': None, \\\'message_clsname\\\': \\\'Pipeline\\\', \\\'message\\\': None, \\\'**\\\': {}}: PicklingError("Can\\\'t pickle <class \\\'__main__.CustomTokenizer\\\'>: it\\\'s not found as __main__.CustomTokenizer")\')\n
Run Code Online (Sandbox Code Playgroud)\n

附加信息

\n
    \n
  • scikit学习0.23.2
  • \n
\n

小智 -1

我尝试处理在模型定义内部调用 Tensorflow 的问题,我相信这是在并行执行中在 GridSearchCV 内部调用的实例。您可以在下面找到我的模型定义。clear_session() 应该释放并行网格搜索执行期间创建的会话中每个图的内存。config.gpu_options.allow_growth = True 和 config.gpu_options.per_process_gpu_memory_fraction = 0.3 提供了一种在 n_jobs=-1 (n_jobs >1) 的情况下使用 GridSearchCV 的方法。在我的代码中进行这些修改后,我可以看到多个训练流,并且没有明显的累积内存使用量增加。请注意,该分数应该足够每个进程可以执行。如果有很多并发进程,该分数无法避免内存爆炸。一种解决方案是将分数定义为 1/(CPU 数量)。但如果 cpu 较多,则存在分数不足以执行进程的风险。此外,当超过1个GPU可用时,必须观察内存不足的GPU的内存总量。

# Function to create model, required for KerasClassifier
def mlp(
loss='binary_crossentropy', optimizer='adam', 
metrics=['accuracy'], init_mode='uniform',
activation='relu', dropout_rate=0.0, weight_constraint=2,
neurons=12, input_shape = (5,)
):
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import Dense # type: ignore
from tensorflow.keras.constraints import MaxNorm # type: ignore
from tensorflow.keras.layers import Dropout # type: ignore

# This is to try to cope with memory leak caused by graph creation
# in Tensorflow.
from tensorflow.keras.backend import clear_session # type: ignore
clear_session()

# This is supposed to help with OOM problem when using multiprocess
# and gridsearch. Including the memory limit.
from tensorflow.compat.v1 import ConfigProto # type: ignore
from tensorflow.compat.v1.keras.backend import set_session # type: ignore
from tensorflow.compat.v1 import Session # type: ignore
config = ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.per_process_gpu_memory_fraction = 0.3
set_session(Session(config=config)) # type: ignore

# create model
model = Sequential()
model.add(Dense(
    units=neurons, input_shape=input_shape, kernel_initializer=init_mode, 
    activation=activation, kernel_constraint=MaxNorm(weight_constraint))
)
model.add(Dropout(dropout_rate))
model.add(Dense(1, kernel_initializer=init_mode, activation='sigmoid'))
# Compile model
model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
return model
Run Code Online (Sandbox Code Playgroud)