在多个GPU上运行相同的模型,但是向每个GPU发送不同的用户数据

Bal*_*aji 8 python distributed pyspark keras tensorflow

任何人都可以成功实现高效的数据并行,您可以将相同的模型定义发送到多个GPU,但是向每个GPU发送不同的用户数据?

看起来dist-keras可能很有希望.但我很乐意听到有关这些方面采取的任何方法的反馈.

我们有用户行为数据:100k用户,200个字段(单热矢量),每个用户30,000条记录.我们使用Keras在Tensorflow之上构建了一个RNN,以预测仅为1个用户采取的下一个行动(20多个可能的行动中).在1个GPU上训练需要大约30分钟.(我的盒子有8个GPU).现在,我们想为所有100k用户构建模型.

我们能够使用多GPU方法为单用户数据执行数据并行.

但由于该模型每个用户需要30分钟,并且有10万用户,我们希望按用户对数据进行分区,并使用群集以分布式方式为每个用户数据运行相同的模型,并为该用户生成模型输出.

我目前正在使用带有TensorFlow 1.4的Keras 2.1.x.

mod*_*itt 4

这并不完全是您所描述的,但是,可能有效的方法是获取每个批次的切片,并通过获取模型并构建一个自动执行此操作的单独模型来分别在不同的 GPU 上训练它们。

假设我们想要使模型并行化,然后在硬件之间训练期间分割其批次。

def make_parallel(model, gpu_count):
    """
    make a paralellized model from the input model on the
    given gpu count that splits the input batch amongst the 
    hardware.

    :param model: The model you want to make parallel
    :param gpu_count: The gpu count
    :return: The parellelized model
    """
    def get_slice(data, idx, parts): # take a slice of the batch
        shape = tf.shape(data)
        size = tf.concat([shape[:1] // parts, shape[1:]], axis=0)
        stride = tf.concat([shape[:1] // parts, shape[1:] * 0], axis=0)
        start = stride * idx
        return tf.slice(data, start, size)

    outputs_all = [[] for i in range(len(model.outputs))]

    # Place a copy of the model on each GPU, each getting a slice of the batch
    for i in range(gpu_count):
        with tf.device('/gpu:%d' % i):
            with tf.name_scope('tower_%d' % i) as scope:
                inputs = []
                for x in model.inputs:
                    input_shape = tuple(x.get_shape().as_list())[1:]
                    slice_n = Lambda(get_slice, output_shape=input_shape, arguments={'idx': i, 'parts': gpu_count})(x)
                    inputs.append(slice_n)

                outputs = model(inputs)

                if not isinstance(outputs, list):
                    outputs = [outputs]

                # Save all outputs to be joined at a later date
                for l in range(len(outputs)):
                    outputs_all[l].append(outputs[l])

    # merge outputs on CPU
    with tf.device('/cpu:0'):
        merged = [merge(output, mode='concat', concat_axis=0) for output in outputs_all]
        return Model(input=model.inputs, output=merged)
Run Code Online (Sandbox Code Playgroud)

在这个模型上训练时你能报告速度结果吗?