如何将张量推送到TensorFlow队列并从另一个进程中拉出它们?

Min*_*ark 4 queue tensorflow

我有一个TensorFlow集群启动并运行,我正在尝试使用一个客户端进程将数据入队,并将其从另一个进程出列.我不能让这个工作,我做错了什么?

这是推送数据的程序:

# queue_push.py
import tensorflow as tf
import time

with tf.container("qtest"):
    q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32],
                     shapes=[[]], name="q")
    v = tf.placeholder(tf.float32, shape=())
    enqueue = q.enqueue([v])

with tf.Session("grpc://localhost:2210") as sess:
    while True:
        t = time.time()
        print(t)
        sess.run(enqueue, feed_dict={v: t})
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

我的程序来拉数据:

# queue_pull.py
import tensorflow as tf
import time

with tf.container("qtest"):
    q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32],
                     shapes=[[]], name="q")
    dequeue = q.dequeue()

with tf.Session("grpc://localhost:2222") as sess:
    while True:
        v = sess.run(dequeue)
        print("Pulled:", v)
        time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)

当我运行它们时,这就是我得到的:

$ python queue_push.py
1472420887.974484
1472420888.991067
1472420889.995756
1472420890.998365
1472420892.001799
1472420893.008567
1472420894.011109
1472420895.014532
1472420896.02017
1472420897.024806
1472420898.03187
(then blocked forever)
Run Code Online (Sandbox Code Playgroud)

并行:

$ python queue_pull.py
(blocked forever)
Run Code Online (Sandbox Code Playgroud)

mrr*_*rry 5

tf.FIFOQueue在多个会话之间共享(或其他TensorFlow队列),您需要将可选shared_name参数传递给构造函数,并将其设置为每个实例中的相同字符串.例如,您可以q在两个脚本中创建如下:

q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[[]],
                 shared_name="shared_q", name="q")
Run Code Online (Sandbox Code Playgroud)

shared_name不必是相同name的队列,但它确实需要在同一容器中创建的相同设备上的所有其他队列中是唯一的.您可以在不使用with tf.container():块的情况下共享队列; 该tf.container()块提供了一种对有状态对象进行分组的方法,以便可以有选择地清除它们(使用tf.Session.reset()).


NB默认共享行为对于tf.Variable对象是不同的,默认情况下这些对象基于其name属性共享.否则,TensorFlow图中的所有共享对象(队列,读取器等)仅在您shared_name的构造函数中设置参数时才会共享.