在TensorFlow中关闭服务器

foi*_*ois 7 python machine-learning deep-learning grpc tensorflow

当我们想要使用分布式TensorFlow时,我们将使用创建一个参数服务器

tf.train.Server.join()
Run Code Online (Sandbox Code Playgroud)

但是,除了终止处理之外,我找不到任何关闭服务器的方法.join()的TensorFlow文档是

Blocks until the server has shut down.
This method currently blocks forever.
Run Code Online (Sandbox Code Playgroud)

这对我来说非常麻烦,因为我想创建许多服务器进行计算,并在一切结束时关闭它们.

有没有可能的解决方案.

谢谢

Yar*_*tov 12

您可以根据需要使参数服务器进程死亡,session.run(dequeue_op)而不是server.join()让另一个进程在您希望此进程死亡时将某些内容排入该队列.

因此,对于k参数服务器分片,您可以创建k具有唯一shared_name属性的队列, 并尝试dequeue从该队列中进行尝试.如果要关闭服务器,可以将所有队列和enqueue令牌循环到每个队列上.这将导致session.run解除阻塞,Python进程将运行到最后并退出,从而关闭服务器.

下面是一个自包含的示例,其中包含2个分片:https: //gist.github.com/yaroslavvb/82a5b5302449530ca5ff59df520c369e

(对于多工作者/多个分片示例,请参阅https://gist.github.com/yaroslavvb/ea1b1bae0a75c4aae593df7eca72d9ca)

import subprocess
import tensorflow as tf
import time
import sys

flags = tf.flags
flags.DEFINE_string("port1", "12222", "port of worker1")
flags.DEFINE_string("port2", "12223", "port of worker2")
flags.DEFINE_string("task", "", "internal use")
FLAGS = flags.FLAGS

# setup local cluster from flags
host = "127.0.0.1:"
cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]}
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()

if __name__=='__main__':
  if not FLAGS.task:  # start servers and run client

      # launch distributed service
      def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT)
      runcmd("python %s --task=0"%(sys.argv[0]))
      runcmd("python %s --task=1"%(sys.argv[0]))
      time.sleep(1)

      # bring down distributed service
      sess = tf.Session("grpc://"+host+FLAGS.port1)
      queue0 = tf.FIFOQueue(1, tf.int32, shared_name="queue0")
      queue1 = tf.FIFOQueue(1, tf.int32, shared_name="queue1")
      with tf.device("/job:worker/task:0"):
          add_op0 = tf.add(tf.ones(()), tf.ones(()))
      with tf.device("/job:worker/task:1"):
          add_op1 = tf.add(tf.ones(()), tf.ones(()))

      print("Running computation on server 0")
      print(sess.run(add_op0))
      print("Running computation on server 1")
      print(sess.run(add_op1))

      print("Bringing down server 0")
      sess.run(queue0.enqueue(1))
      print("Bringing down server 1")
      sess.run(queue1.enqueue(1))

  else: # Launch TensorFlow server
    server = tf.train.Server(clusterspec, config=None,
                             job_name="worker",
                             task_index=int(FLAGS.task))
    print("Starting server "+FLAGS.task)
    sess = tf.Session(server.target)
    queue = tf.FIFOQueue(1, tf.int32, shared_name="queue"+FLAGS.task)
    sess.run(queue.dequeue())
    print("Terminating server"+FLAGS.task)
Run Code Online (Sandbox Code Playgroud)


I'l*_*Hat 5

这个页面经常出现在谷歌上,所以我想我会尝试改进Yaroslav 的答案,为那些刚刚进入分布式 Tensorflow 的人提供我希望更清晰的答案。

import tensorflow as tf
import threading

def main(job_name, task):
    cluster = tf.train.ClusterSpec({
        'ps': ['localhost:22222', 'localhost:22223'],
        'worker': ['localhost: 22224','localhost: 22225','localhost: 22226']
    })

    server = tf.train.Server(cluster, job_name=job_name, task_index=task)

    if job_name == 'ps':
        # create a shared queue on the parameter server which is visible on /job:ps/task:%d
        with tf.device('/job:ps/task:%d' % task):
            queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % task)

        # wait for the queue to be filled
        with tf.Session(server.target) as sess:
            for i in range(cluster.num_tasks('worker')):
                sess.run(queue.dequeue())
                print('ps:%d received "done" from worker:%d' % (task, i))
            print('ps:%d quitting' % task)

    elif job_name == 'worker':
        queues = []
        # create a shared queue on the worker which is visible on /job:ps/task:%d
        for i in range(cluster.num_tasks('ps')):
            with tf.device('/job:ps/task:%d' % i):
                queues.append(tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % i))

        # fill the queue
        with tf.Session(server.target) as sess:
            for i in range(cluster.num_tasks('ps')):
                _, size = sess.run([queues[i].enqueue(task), queues[i].size()])
                print('Worker:%d sending "done" to ps:%d [elements=%d]' % (task, i, size))

if __name__ == '__main__':
    threads = [
        threading.Thread(target=main, args=('ps', 0)),
        threading.Thread(target=main, args=('ps', 1)),
        threading.Thread(target=main, args=('worker', 0)),
        threading.Thread(target=main, args=('worker', 1)),
        threading.Thread(target=main, args=('worker', 2))]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
Run Code Online (Sandbox Code Playgroud)

通过使用以下代码段替换代码的工作部分,扩展“规范”分布式 Tensorflow 示例非常简单:

    # create a worker that does nothing
    elif job_name == 'worker':
        with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d' % task, cluster=cluster)):
            global_step = tf.train.get_or_create_global_step()
            no_op = tf.no_op()

        done_ops = []
        # create a shared queue on the worker which is visible on /job:ps/task:%d
        for i in range(cluster.num_tasks('ps')):
            with tf.device('/job:ps/task:%d' % i):
                done_queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue' + str(i))
                done_ops.append(done_queue.enqueue(task))

        hooks=[tf.train.StopAtStepHook(last_step=1),
               tf.train.FinalOpsHook([done_ops])]

        with tf.train.MonitoredTrainingSession(master=server.target,
                                               is_chief=(task == 0),
                                               hooks=hooks) as sess:
            sess.run([no_op])
Run Code Online (Sandbox Code Playgroud)

请注意,MonitoredTrainingSession 版本在将所有工作人员连接在一起时似乎要慢得多。