Tensorflow:自定义数据加载+异步计算

rgr*_*rgr 28 tensorflow

这是我认为从TF示例中错过的方法.

任务:

  1. 每个类的样本都在单独的目录中给出,因此标签是间接的(即通过dir)
  2. TF中的解耦负载和计算

可以找到每个单独的位,但是我认为将它们放在一个地方将有助于为TF初学者(比如我自己)节省大量时间.

让我们解决1.在我的情况下它是两组图像:

# all filenames for .jpg in dir 
#  - list of fnames
#  - list of labels 
def path_fnames(f_path, label, ext = ['.jpg', '.jpeg']):
    f_n = [f_path+'/'+f for f in sorted(os.listdir(f_path)) if os.path.splitext(f)[1].lower() in ext]
    f_l = [label] * len(f_n)
    return f_n, f_l
#     
def dense_to_one_hot(labels_dense, num_classes=10, dtype=np.float32):
    """Convert class labels from scalars to one-hot vectors."""
    num_labels     = labels_dense.shape[0]
    index_offset   = np.arange(num_labels) * num_classes
    labels_one_hot = np.zeros((num_labels, num_classes),dtype=dtype)
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot

data_dir = '/mnt/dataset/'
dir_1   = '/class_1'
dir_2   = '/class_2'

# --- get filenames for data ---
dpath = [data_dir+dir_1, data_dir+dir_2]

f_n1, f_l1 = path_fnames(dpath[0], 0)
f_n2, f_l2 = path_fnames(dpath[1], 1)

# --- create one-hot labels ---
ohl    = dense_to_one_hot(np.asarray(f_l1+f_l2), num_classes=2, dtype = np.float32)
fnames = f_n1+f_n2;               # one-hot labels created in this sequence
Run Code Online (Sandbox Code Playgroud)

现在我们预装了所有文件名和单热标签.

让我们转到2.

它基于如何使用tensorflow中的自定义python函数预取数据.简而言之,它有:

  • 自定义图像阅读器(替换为您的)
  • 使用[文件名标签] 排队fnl_q,供读者使用
  • queue proc_q with [sample label],用于处理some_op
  • 执行read_op以获取[sample label]和enqueue_op以将pair放入proc_q的线程.线程由tf.Coordinator控制
  • some_op首先通过dequeue_many()从proc_q获取数据和其余的计算(也可以放在单独的线程中).

笔记:

  • feature_read_op和label_read_op是两个独立的操作.
  • 我使用sleep()来减慢和控制op - 仅用于测试目的
  • 我已经分开了"喂食"和"计算"部分 - 实际上只是并行运行它们
print 'TF version:', tf.__version__
# --- params ----
im_s       = [30, 30, 1]   # target image size
BATCH_SIZE = 16

# image reader 
# - fnl_queue: queue with [fn l] pairs 
# Notes 
# - to resize:  image_tensor = tf.image.resize_image_with_crop_or_pad(image_tensor, HEIGHT, WIDTH)
# - how about image preprocessing?
def img_reader_jpg(fnl_queue, ch = 3, keep = False):
    fn, label = fnl_queue.dequeue()

    if keep:
        fnl_queue.enqueue([fn, label])

    img_bytes = tf.read_file(fn)
    img_u8    = tf.image.decode_jpeg(img_bytes, channels=ch) 
    img_f32   = tf.cast(img_u8, tf.float32)/256.0  
    #img_4     = tf.expand_dims(img_f32,0)
    return img_f32, label

#  load [feature, label] and enqueue to processing queue
# - sess:             tf session 
# - sess:             tf Coordinator
# - [fr_op, lr_op ]:  feature_read_op label_read_op
# - enqueue_op:       [f l] pairs enqueue op
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    i = 0
    while not coord.should_stop():
        # for testing purpose
        time.sleep(0.1)                     
        #print 'load_and_enqueue i=',i
        #i = i +1

        feature, label = sess.run([feature_read_op, label_read_op ])

        feed_dict = {feature_input: feature,
                     label_input  : label}

        sess.run(enqueue_op, feed_dict=feed_dict)


# --- TF part ---

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

#fnl_q    = tf.FIFOQueue(len(fnames), [tf.string, tf.float32])
fnl_q    = tf.RandomShuffleQueue(len(fnames), 0, [tf.string, tf.float32])
do_enq = fnl_q.enqueue_many([fv, lv])

# reading_op: feature_read_op label_read_op 
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = im_s[2])

# samples queue
f_s = im_s
l_s = 2
feature_input = tf.placeholder(tf.float32, shape=f_s, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=l_s, name='label_input')

#proc_q     = tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
proc_q     = tf.FIFOQueue(len(fnames), [tf.float32, tf.float32], shapes=[f_s, l_s])
enqueue_op = proc_q.enqueue([feature_input, label_input])

# test: 
# - some op
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
some_op   = [img_batch, lab_batch]

# service ops
init_op   = tf.initialize_all_variables()

# let run stuff
with tf.Session() as sess:

    sess.run(init_op)
    sess.run(do_enq)

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    # --- test thread stuff ---
    #  - fill proc_q
    coord = tf.train.Coordinator()
    t = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op))
    t.start()

    time.sleep(2.1)

    coord.request_stop()
    coord.join([t])

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    #  - process a bit 
    ss = sess.run(some_op)
    print 'ss[0].shape', ss[0].shape 
    print ' ss[1]:\n', ss[1]

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval() 

print 'ok'
Run Code Online (Sandbox Code Playgroud)

典型输出:

TF version: 0.6.0

fnl_q.size: 1225
proc_q.size: 0

fnl_q.size: 1204
proc_q.size: 21

ss[0].shape (16, 30, 30, 1)
 ss[1]:
[[ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]]

fnl_q.size: 1204
proc_q.size: 5

ok  
Run Code Online (Sandbox Code Playgroud)

一切如预期

  • 创建了一批[样本标签]
  • 对被洗牌

唯一剩下的就是应用TF,因为它打算通过替换some_op来使用:)

还有一个问题: 一个观察到的问题 - 如果我tf.FIFOQueue用于文件名和tf.RandomShuffleQueue样本 - 洗牌不会发生.然而其他方式(如上面的代码)它确实完美地洗牌.

洗牌有
tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])什么问题吗?


ADD:具有两个线程的版本:

  • 一个用于重新填充/更新/更改文件名队列
  • 第二个用于填充样本到处理队列.

还添加了正确的方法来停止线程.

def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    try:
        while not coord.should_stop():
            feature, label = sess.run([feature_read_op, label_read_op ])
            feed_dict = {feature_input: feature,
                         label_input  : label}
            sess.run(enqueue_op, feed_dict=feed_dict)
    except Exception as e:
        return


# periodically check the state of fnl queue and if needed refill it
#  - enqueue_op: 'refill' file-name_label queue 
def enqueue_fnl(sess, coord, fnl_q, enqueue_op):
    try:
        while not coord.should_stop():
            time.sleep(0.5)
            s = sess.run(fnl_q.size())
            if  s < (9*BATCH_SIZE) :
                sess.run(enqueue_op)
    except Exception as e:
        return


#  -- ops for feed part --

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

# read op
fnl_q      = tf.RandomShuffleQueue(len(fnames)*2, 0, [tf.string, tf.float32], name = 'fnl_q')  # add some margin for re-fill to fit
do_fnl_enq = fnl_q.enqueue_many([fv, lv])
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = IMG_SIZE[2])

# samples queue
feature_input = tf.placeholder(tf.float32, shape=IMG_SIZE, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=LAB_SIZE, name='label_input')
proc_q        = tf.FIFOQueue(len(fnames)*3, [tf.float32, tf.float32], shapes=[IMG_SIZE, LAB_SIZE], name = 'fe_la_q') 
enqueue_op    = proc_q.enqueue([feature_input, label_input])

# -- ops for trainind end eval
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)

... here is your model

loss       = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, lab_ph))
optimizer  = tf.train.AdamOptimizer(1e-4).minimize(loss)

with tf.Session() as sess:

    coord = tf.train.Coordinator()
    t_le  = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op) , name = 'load_and_enqueue')
    t_re  = threading.Thread(target=enqueue_fnl, args = (sess, coord, fnl_q, do_fnl_enq), name = 'enqueue_fnl')  # re-enq thread i.e. refiling filename queue 
    t_le.start()
    t_re.start()

    try:
    # training
    for step in xrange(823):
        # some proc
        img_v, lab_v = sess.run([img_batch, lab_batch])
        feed_dict = { img_ph   : img_v,
              lab_ph   : lab_v,
              keep_prob: 0.7}
        _, loss_v = sess.run([optimizer, loss], feed_dict = feed_dict)

    except Exception as e:
    print 'Training: Exception:', e


    # stop threads 
    coord.request_stop()                                     # ask to stop
    sess.run(fnl_q.close(cancel_pending_enqueues=True))      # tell proc_q don't wait for enque anymore
    sess.run(proc_q.close(cancel_pending_enqueues=True))     # tell proc_q don't wait for enque anymore
    coord.join([t_le, t_re], stop_grace_period_secs=8)       
Run Code Online (Sandbox Code Playgroud)