我试图得到一个分布式TensorFlow工作的一个非常简单的例子.但是,我有一个在运行之间看起来不确定的错误.在某些运行中,它完美地运行.输出以下内容:
Worker 2 | step 0
Worker 0 | step 0
Worker 1 | step 0
Worker 3 | step 0
Worker 2 | step 1
Worker 0 | step 1
Worker 1 | step 1
Worker 3 | step 1
...
Run Code Online (Sandbox Code Playgroud)
但是,每隔一段时间,一个或多个工作人员就无法运行,导致输出如下:
Worker 0 | step 0
Worker 3 | step 0
Worker 0 | step 1
Worker 3 | step 1
Worker 0 | step 2
Worker 3 | step 2
...
Run Code Online (Sandbox Code Playgroud)
如果我无限期地运行循环,似乎缺少的工作人员总是在某个时刻启动,但仅仅几分钟后,这是不切实际的.
我发现有两件事情会让问题消失(但是让程序变得无用):1.不在with tf.device(tf.train.replica_device_setter())范围内声明任何变量.如果我甚至声明一个变量(例如nasty_var下面),问题就会开始出现.和2设置is_chiefPARAM在tf.train.MonitoredTrainingSession()给True所有工人.即使声明了变量,这也会导致bug消失,但是让所有的工作人员成为主管似乎是错误的.我目前在下面设置它的方式is_chief=(task_index == 0)- 直接来自TensorFlow教程.
这是我可以复制问题的最简单的代码.(您可能需要多次运行才能看到错误,但它几乎总是在5次运行中显示出来
from multiprocessing import Process
import tensorflow as tf
from time import sleep
from numpy.random import random_sample
cluster = tf.train.ClusterSpec({'ps': ['localhost:2222'],
'worker': ['localhost:2223',
'localhost:2224',
'localhost:2225',
'localhost:2226']})
def create_worker(task_index):
server = tf.train.Server(cluster, job_name='worker', task_index=task_index)
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
nasty_var = tf.Variable(0) # This line causes the problem. No issue when this is commented out.
with tf.train.MonitoredTrainingSession(master=server.target, is_chief=(task_index == 0)):
for step in xrange(10000):
sleep(random_sample()) # Simulate some work being done.
print 'Worker %d | step %d' % (task_index, step)
def create_ps(task_index):
param_server = tf.train.Server(cluster, job_name='ps',
task_index=task_index)
param_server.join()
# Launch workers and ps in separate processes.
processes = []
for i in xrange(len(cluster.as_dict()['worker'])):
print 'Forking worker process ', i
p = Process(target=create_worker, args=[i])
p.start()
processes.append(p)
for i in xrange(len(cluster.as_dict()['ps'])):
print 'Forking ps process ', i
p = Process(target=create_ps, args=[i])
p.start()
processes.append(p)
for p in processes:
p.join()
Run Code Online (Sandbox Code Playgroud)
我猜这里的原因是tf.train.MonitoredTrainingSession启动方式中的隐式协调协议,这是在这里实现的:
如果这次会议是主席:
否则(如果本次会议不是主席):
(我在关于分布式TensorFlow的视频中讨论了该协议背后的基本原理.)
当每个会话都是主要会话,或者没有要初始化的变量时,tf.train.MonitoredTrainingSession将始终立即开始.但是,一旦有一个变量,而你只有一个负责人,你就会发现非首席工作人员必须等待主管采取行动.
使用该协议的原因在于它对于各种进程失败是健壮的,并且与在典型的分布式训练作业的预期运行时间相比,在单个进程上运行所有内容时非常明显的延迟是很短的.
再看一下这个实现,似乎这个30秒的超时应该是可配置的(作为recovery_wait_secs参数tf.train.SessionManager()),但是当你创建一个时tf.train.MonitoredTrainingSession,目前没有办法设置这个超时,因为它使用一组硬编码的参数来创建会话管理员.这似乎是API的疏忽,所以请随时在GitHub问题页面上打开功能请求!
| 归档时间: |
|
| 查看次数: |
1835 次 |
| 最近记录: |