Mar*_*rio 7 python multithreading multiprocessing deep-learning keras
我假设像keras / tensorflow / ...这样的大多数框架会自动使用所有CPU内核,但实际上它们似乎并没有。我只是发现很少的资源可以导致我们在深度学习过程中使用CPU的全部容量。我找到了一篇有关
from multiprocessing import Pool
import psutil
import ray
Run Code Online (Sandbox Code Playgroud)
另一方面,基于在多个过程中使用keras模型的答案,没有上述库的踪迹。是否有更优雅的方式利用Keras 的多处理功能,因为它在实施中非常受欢迎。
例如,如何通过简单的RNN实现进行修改,以在学习过程中实现至少50%的CPU容量?
我应该使用第二模型作为多任务处理(如LSTM)吗?我的意思是我们可以使用更多的CPU来同时管理运行多个模型吗?
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop
df = pd.read_csv("D:\Train.csv", header=None)
index = [i for i in list(range(1440)) if i%3==2]
Y_train= df[index]
df = df.values
#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
dataX,dataY = [],[]
print("Len:",len(dataset)-look_back-1)
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train=np.array(Y_train)
df=np.array(df)
look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)
#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)
model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#predict
Y_train=np.array(trainY)
Y_test=np.array(testX)
Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)
train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)
# create and fit the Simple LSTM model as 2nd model for multi-tasking
#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#Y_train=np.array(trainY)
#Y_test=np.array(testX)
#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)
#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)
#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
plt.subplot(1, 2, 1)
ax=plt.plot(hist_RNN.history['loss'] ,label='Train loss')
ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title(' RNN Loss on Train and Test data')
plt.legend()
plt.subplot(1, 2, 2)
ax=plt.plot(hist_LSTM.history['loss'] ,label='Train loss')
ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title('LSTM Loss on Train and Test data')
plt.legend()
plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
#plt.savefig('All_Losses_history_.png')
plt.show()
Run Code Online (Sandbox Code Playgroud)
注意:仅当我访问没有VGA的强大服务器时,我才访问CUDA。我的目标是利用多处理和多线程来最大程度地利用CPU而不是30%的容量,这意味着只有四核才能拥有一个核!任何建议将不胜感激。我已经上传了格式化的csv数据集。
更新:我的硬件配置如下:
rep*_*ved 10
训练一个模型并不会完全使用100%的CPU是一件好事!现在,我们有空间并行训练多个模型,并加快您的总体训练时间。
注意:如果您只是想加快此模型的速度,请查看GPU或更改超参数,例如批大小和神经元数量(层大小)。
这是用于multiprocessing同时训练多个模型的方法(使用在计算机的每个单独CPU内核上并行运行的进程)。
该multiprocessing.Pool基本创建的作业池需要做。流程将拾取并运行这些作业。作业完成后,该过程将从池中提取另一个作业。
import time
import signal
import multiprocessing
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(layer_size):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
import keras
from keras.models import Sequential
from keras.layers import Dense
print(f'Training a model with layer size {layer_size}')
# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))
# fit the model (the bit that takes time!)
model_RNN.fit(...)
# lets demonstrate with a sleep timer
time.sleep(5)
# save trained model to a file
model_RNN.save(...)
# you can also return values eg. the eval score
return model_RNN.evaluate(...)
num_workers = 4
hyperparams = [800, 960, 1100]
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, hyperparams)
print(scores)
Run Code Online (Sandbox Code Playgroud)
输出:
Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]
Run Code Online (Sandbox Code Playgroud)
time.sleep代码中的a很容易说明这一点。您会看到所有3个过程都开始了培训工作,然后它们都几乎同时完成。如果这是单个处理的,则必须等待每个处理完成后才能开始下一个(打哈欠!)。
EDIT OP还需要完整的代码。这在Stack Overflow上很困难,因为我无法在您的环境和您的代码中进行测试。我已经自由地将您的代码复制并粘贴到上面的模板中。您可能需要添加一些导入,但这与您获得“可运行的”和“完整的”代码非常接近。
import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(model_type):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization
print(f'Training a model: {model_type}')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]
model = Sequential()
if model_type == 'rnn':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == 'lstm':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation('tanh'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)
# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)
train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)
# you can also return values eg. the eval score
return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}
# Your code
# ---------
df = pd.read_csv("D:\Train.csv", header=None)
index = [i for i in list(range(1440)) if i % 3 == 2]
Y_train = df[index]
df = df.values
# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train = np.array(Y_train)
df = np.array(df)
look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)
# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)
# My Code
# -------
num_workers = 2
model_types = ['rnn', 'lstm']
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, model_types)
print(scores)
Run Code Online (Sandbox Code Playgroud)
程序输出:
[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866},
{'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
783 次 |
| 最近记录: |