如何使用张量流中的QueueRunner将动态创建的输入图像添加到RandomShuffleQueue

aph*_*ent 2 python numpy neural-network conv-neural-network tensorflow

我正在尝试用程序执行期间创建的图像训练CNN.我有一个游戏环境(不是由我创建的),它生成依赖于游戏中所采取的动作的屏幕图像.这些动作由学习的CNN控制.

然后将这些图像推入RandomShuffleQueue,从中将小批量出列并用于在正确的动作上训练CNN.我想异步地进行游戏(游戏和训练),游戏正在播放,并且它的屏幕被添加到RandomShuffleQueue中,该线程用于训练模型.这是我正在尝试的非常简化的版本.

import tensorflow as tf
from game_env import game


experience = tf.RandomShuffleQueue(10000,
                                1000, tf.float32,
                                shapes = [32,32],  
                                name = 'experience_replay')

def perceive(game):
    rawstate = game.grab_screen()
    enq = experience.enqueue(rawstate)
    return enq

#create threads to play the game and collect experience
available_threads = 4
coord = tf.train.Coordinator()
experience_runner = tf.train.QueueRunner(experience,
                                [perceive(game()) for num in range(available_threads)])

sess = tf.Session()
sess.run(tf.initialize_all_variables())
enqueue_threads = experience_runner.create_threads(sess, coord = coord, start = True)

with sess.as_default():
    while(1):
        print sess.run(experience.dequeue())
        time.sleep(.5)
Run Code Online (Sandbox Code Playgroud)

同时,game_env看起来像这样:

import tensorflow as tf
class game(object): 
    def __init__(self):
        self.screen_size = [32,32]
        self.counter = 0

    def grab_screen(self):
        """current screen of the game"""
        self.counter += 1
        screen = self.counter*tf.ones(self.screen_size)
        return screen
Run Code Online (Sandbox Code Playgroud)

如您所见,游戏环境现在非常简单:每次执行屏幕抓取时,计数器都会递增,并返回一个用计数器(正确大小)填充的图像.

应该注意的是,我编写了上面的类只是为了测试,一般来说,grab_screen可以返回任何numpy nd-array.此外它不是我写的,所以我可以调用grab_screen而不是在真实内容中进行任何更改.

现在,问题是经验队列似乎只持有一个的张量(即计数器只更新一次!!)

样本输出:

I tensorflow/core/common_runtime/local_device.cc:40] Local device intra op parallelism threads: 4
I tensorflow/core/common_runtime/direct_session.cc:58] Direct session inter op parallelism threads: 4

[[ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 ...,

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]]

[[ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 ...,

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]]

[[ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 ...,
 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]

 [ 1.  1.  1. ...,  1.  1.  1.]]
Run Code Online (Sandbox Code Playgroud)

等等.我的问题是:如何动态创建输入图像以便像这样排入RandomShuffleQueue?谢谢!

mrr*_*rry 5

问题可以追溯到这一行,它定义了tf.train.QueueRunner:

experience_runner = tf.train.QueueRunner(
    experience, [perceive(game()) for num in range(available_threads)])
Run Code Online (Sandbox Code Playgroud)

这会创建四个(available_threads)操作,每次运行时,都会将填充了1.0的张量排入experience队列.逐步完成列表理解中发生的事情应该更清楚.以下情况发生四次:

  1. game构造一个对象.
  2. 它传递给perceive().
  3. perceive()调用game.grab_screen()一次,使计数器递增,并返回张量1 * tf.ones(self.screen_size)
  4. percieve()将此张量传递给experience.enqueue()并返回结果op.

QueueRunner.create_threads()调用为每个enqueue操作创建一个线程,并且这些线程在无限循环中运行(当队列达到容量时阻塞).

要获得所需的效果,每次排队体验时,都应使用Feed机制和占位符为抓取的屏幕传递不同的值.这取决于您的game类的实现方式,但您可能也想初始化该类的单个实例.最后,目前尚不清楚是否需要多个入队线程,但我们假设它game.grab_screen()是线程安全的并允许一些并发.鉴于这一切,似乎合理的版本如下(请注意,您需要创建自定义线程而不是QueueRunner使用馈送):

import tensorflow as tf
from game_env import game

experience = tf.RandomShuffleQueue(10000,
                                   1000, tf.float32,
                                   shapes=[32,32],  
                                   name='experience_replay')

screen_placeholder = tf.placeholder(tf.float32, [32, 32])
# You can create a single enqueue op and dequeued tensor, and reuse these from
# multiple threads.
enqueue_op = experience.enqueue(screen_placeholder)
dequeued_t = experience.dequeue()
# ...

init_op = tf.initialize_all_variables()

game_obj = game()

sess = tf.Session()
coord = tf.train.Coordinator()

# Define a custom thread for running the enqueue op that grabs a new
# screen in a loop and feeds it to the placeholder.
def enqueue_thread():
    with coord.stop_on_exception():
        while not coord.should_stop():
            screen_val = game_obj.grab_screen()
            # Run the same op, but feed a different value for the screen.
            sess.run(enqueue_op, feed_dict={screen_placeholder: screen_val}) 

available_threads = 4
for _ in range(available_threads):
    threading.Thread(target=enqueue_thread).start()


while True:
    # N.B. It's more efficient to reuse the same dequeue op in a loop.
    print sess.run(dequeued_t)
    time.sleep(0.5)
Run Code Online (Sandbox Code Playgroud)