用于Python 3.6.x的model.fit_generator的keras线程安全生成器

Szy*_*cot 5 multithreading generator python-3.x keras

我正在将Keras 2.0.8用于U-net 2D医学细分项目。目前,我正在努力创建自定义线程安全图像生成器(同时用于X和y)。X和y是4D矩阵,形状为n_img x n_col x n_row x T,其中X的T为4,y的T为1(将4个数字标签转换为第4维的一种热编码)

这是我的代码:

def gen_tr(X,y,batch_size):
    n=np.floor((len(X)-1)/batch_size).astype(int)
    s=list(X.shape)
    s[0]=batch_size
    while 1:
        for i in range(n):
            j=0
            X_b=np.zeros(s,dtype=np.float32)
            y_b=np.zeros(s,dtype=int)
            while j<batch_size:
                data=distort_imgs(X[i*batch_size+j,:,:,0, np.newaxis],
                              X[i*batch_size+j,:,:,1, np.newaxis], 
                              X[i*batch_size+j,:,:,2, np.newaxis],
                              X[i*batch_size+j,:,:,3, np.newaxis], 
                              y[i*batch_size+j,:,:,0, np.newaxis])
                X_i=np.concatenate(data[:4],axis=2)
                y_i=data[-1]
                y_i=np.concatenate((y_i==0,y_i==1,y_i==2,y_i==4),
                               axis=2).astype(int)
                X_b[j]=X_i
                y_b[j]=y_i
                j+=1
            yield (X_b,y_b)
batch_size=20
gen = gen_tr(X_train,Y_train,batch_size)
steps=np.floor((len(X_train)-1)/batch_size).astype(int)
model.fit_generator(gen,steps_per_epoch=steps, epochs=5, verbose=1, shuffle=True, 
max_queue_size=10,workers=2, use_multiprocessing=False)
Run Code Online (Sandbox Code Playgroud)

错误:

Exception in thread Thread-13:
Traceback (most recent call last):
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\utils\data_utils.py", line 568, in data_generator_task
    generator_output = next(self._generator)
ValueError: generator already executing

Traceback (most recent call last):

  File "<ipython-input-17-1a91cea3a91e>", line 7, in <module>
    max_queue_size=10,workers=2, use_multiprocessing=False)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\legacy\interfaces.py", line 87, in wrapper
    return func(*args, **kwargs)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\engine\training.py", line 2011, in fit_generator
    generator_output = next(output_generator)

StopIteration
Run Code Online (Sandbox Code Playgroud)

我尝试过以下解决方案: keunwoochoi.wordpress.comstanford(相同)。

他们都没有工作。当我添加时:

import threading
class threadsafe_iter:
def __init__(self, it):
    self.it = it
    self.lock = threading.Lock()
def __iter__(self):
    return self
def __next__(self):
    with self.lock:
        return self.it.next()

def threadsafe_generator(f):
    def g(*a, **kw):
        return threadsafe_iter(f(*a, **kw))
    return g

@threadsafe_generator
#now goes my generator from above
Run Code Online (Sandbox Code Playgroud)

我收到错误消息:

Epoch 1/5
Exception in thread Thread-10:
Traceback (most recent call last):
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\utils\data_utils.py", line 568, in data_generator_task
    generator_output = next(self._generator)
  File "<ipython-input-12-24605a93d655>", line 17, in __next__
    return self.it.next()
AttributeError: 'generator' object has no attribute 'next'

Traceback (most recent call last):

  File "<ipython-input-13-b07830ef87c0>", line 5, in <module>
    max_queue_size=10,workers=2, use_multiprocessing=False)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\legacy\interfaces.py", line 87, in wrapper
    return func(*args, **kwargs)

  File "D:\Users\SZ_KOCOT\Anaconda3\envs\cnn1\lib\site-packages\keras\engine\training.py", line 2011, in fit_generator
    generator_output = next(output_generator)

StopIteration
Run Code Online (Sandbox Code Playgroud)

在fit_generator内部使用worker = 1的情况下,一切正常(提供了无需解决方案代码的生成器),包括next(gen)和gen .__ next__

具有1个线程的数据生成器性能不足,尤其是我有多个可用的内核...

有人可以帮我解决这个问题吗?我是python线程中的新手。

编辑:我已经找到一个解决方案/解决方法。也许对于keras来说有点太过分了,但它确实有效。受zsdonghao启发。通过将数据集扩充分成2750个样本的10个部分,我能够非常快速地准备数据并几乎100%使用gtx1080。内存的使用也不会超过〜22GB。1个历时的训练大约需要14-15分钟,数据准备/自动训练总共需要10-12分钟。当我将其与单个工人的fit_generator进行比较时,减少了3倍以上

如果它可以帮助某人,请使用以下确切代码:

import tensorlayer as tl
import pandas as pd

batch_size=20
epochs=10
batch_size=20
step_size=2750
steps=np.floor((len(X_train)-1)/step_size).astype(int)
s=list(X_train.shape)
train_all=pd.DataFrame()
eval_all=pd.DataFrame()

#training and evaluation
for i in range(epochs):
    start_time = time.clock()
    print('Epoch: {0:02d}'.format(i+1))
    for j in range(steps):
        ind=range(step_size*j,step_size*(j+1))
        data = tl.prepro.threading_data([_ for _ in zip(X_train[ind,:,:,0, np.newaxis],
                                                        X_train[ind,:,:,1, np.newaxis], 
                                                        X_train[ind,:,:,2, np.newaxis],
                                                        X_train[ind,:,:,3, np.newaxis],
                                                        y_train[ind])],fn=distort_imgs,thread_count=None)
        X_s = data[:,0:4,:,:,:]                                                 
        y_s = data[:,4,:,:,:]
        X_s = X_s.transpose((0,2,3,1,4))
        X_s.shape = (step_size, s[1], s[2], s[3])
        y_s=np.concatenate((y_s==0,y_s==1,y_s==2,y_s==4),
                                           axis=3).astype(int)
        train=model.fit(X_s, y_s,class_weight=weights, verbose=0,
                        batch_size=batch_size, epochs=i+2,initial_epoch=i+1)
        train.history['epoch']=i+1
        train.history['step']=j+1
        train=pd.DataFrame(train.history)
        train_all=pd.concat([train_all,train],ignore_index=True)
        print(train.to_string(index=False))
    eval=model.evaluate(X_test, y_test, batch_size=batch_size, verbose=0)
    eval=pd.DataFrame({'val_dice_coe':eval[0],'val_dice_hard_coe':eval[1], 'val_iou_coe':eval[2], 'val_loss':eval[3]},index=[0])
    eval['epoch']=i+1
    eval_all=pd.concat([eval_all,eval],ignore_index=True)
    print(eval.to_string(index=False))
    model.save('{0}_ep_{1}.h5'.format(model_name,i+1))
    print('Epoch {0:02d} took: {1:.3f} min'.format(i+1,(time.clock()-start_time)/60))
Run Code Online (Sandbox Code Playgroud)

Dan*_*ler 2

在 Python 3 中,您应该使用next(self.it)而不是self.it.next().

您还可以尝试使用Keras Sequences,它似乎更安全,因为它经过索引以在多处理时保留正确的数据顺序。

最后,似乎workers只影响生成器本身,而不影响模型。在我的测试中(我也不擅长线程......),我可以看到更多工作人员的唯一区别是更大的预加载数据队列等待它们进入模型。

  • 您在线程 Thread-10 中出现异常:是因为您没有使用线程安全的 Keras 序列,这就是您收到此类错误的原因 (2认同)