我有一个文件train.csv,其中包含图像及其标签的路径.即:
img1.jpg 3
img2.jpg 1
...
Run Code Online (Sandbox Code Playgroud)
在阅读了阅读数据教程之后,我想出了一些代码来遍历每个图像,调整大小并应用扭曲:
def apply_distortions(resized_image):
# do a bunch of tf.image distortion...
return float_image
def processing(filename):
file_contents = tf.read_file(filename)
image = tf.image.decode_jpeg(file_contents, channels=3)
resized_image = tf.image.resize_images(image, 299, 299)
distorted_image = apply_distortions(resized_image)
return distorted_image
def parse_csv(filename_queue):
line_reader = tf.TextLineReader()
key, line = line_reader.read(filename_queue)
filename, label = tf.decode_csv(line, # line_batch or line (depending if you want to batch)
record_defaults=[tf.constant([],dtype=tf.string),
tf.constant([],dtype=tf.int32)],
field_delim=' ')
processed_image = processing(filename)
return processed_image, label
Run Code Online (Sandbox Code Playgroud)
现在的问题是我很困惑如何在文件中并行执行这些操作.文档建议使用tf.train.batch_join或tf.train.batch使用num_threads = N.
我首先尝试使用示例代码,tf.train.batch_join但这似乎是为了并行处理多个文件.在我的情况下,我只有1个文件.
filename_queue = tf.train.string_input_producer(["train.txt"], num_epochs=1, shuffle=True)
example_list = [parse_csv(filename_queue) for _ in range(8)]
example_batch, label_batch = tf.train.batch_join(example_list, batch_size)
Run Code Online (Sandbox Code Playgroud)
我也试过设置tf.train.batch([example, label], batch_size, num_threads=8)但是我不清楚这是不是正确的事情(虽然我可以看到更多的cpu核心在使用)
filename_queue = tf.train.string_input_producer(["train.txt"], num_epochs=1, shuffle=True)
example, label = parse_csv(filename_queue)
example_batch, label_batch = tf.train.batch([example, label], batch_size, num_threads=8)
Run Code Online (Sandbox Code Playgroud)
这是我执行图表的代码:
sess.run(tf.initialize_all_variables())
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess,coord)
try:
while not coord.should_stop():
X, Y = sess.run([example_batch, label_batch])
# Now run a training step
except tf.errors.OutOfRangeError:
print('Done training -- epoch limit reached')
finally:
# When done, ask the threads to stop.
coord.request_stop()
coord.join(threads)
sess.close()
Run Code Online (Sandbox Code Playgroud)
什么是并行处理此文件的最佳方法?
两者似乎都是可行的方法.使用batchwith threads=N将创建N连接到队列的阅读器副本,以便它们可以并行运行,同时batch_join您必须手动创建副本.
在您的使用中,batch_join您正在创建多个副本TextLineReader(如您所见)将仅跨文件并行化.要让多个线程读取单个文件,您可以使用同一个读取器创建一个TextLineReader并使用多个line_reader.read操作.
这是一个包含数字的文本文件的示例
生成数字:
num_files=10
num_entries_per_file=10
file_root="/temp/pipeline"
os.system('mkdir -p '+file_root)
for fi in range(num_files):
fname = file_root+"/"+str(fi)
dump_numbers_to_file(fname, fi*num_entries_per_file, (fi+1)*num_entries_per_file)
Run Code Online (Sandbox Code Playgroud)
分批读取那些大小为2的数字,并行度为2
ops.reset_default_graph()
filename_queue = tf.train.string_input_producer(["/temp/pipeline/0",
"/temp/pipeline/1"],
shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
numeric_val1, = tf.decode_csv(value, record_defaults=[[-1]])
numeric_val2, = tf.decode_csv(value, record_defaults=[[-1]])
numeric_batch = tf.batch_join([[numeric_val1,], [numeric_val2]], 2)
# have to create session before queue runners because they use default session
sess = create_session()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
print '\n'.join([t.name for t in threads])
for i in range(20):
print sess.run([numeric_batch])
coord.request_stop()
coord.join(threads)
Run Code Online (Sandbox Code Playgroud)
你可能会看到这样的事情:
QueueRunner(input_producer:input_producer/input_producer_EnqueueMany)
QueueRunner(input_producer:input_producer/input_producer_Close_1)
QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_enqueue)
QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_enqueue_1)
QueueRunner(batch_join/fifo_queue:batch_join/fifo_queue_Close_1)
[array([0, 1], dtype=int32)]
[array([2, 3], dtype=int32)]
[array([4, 5], dtype=int32)]
[array([6, 7], dtype=int32)]
[array([8, 9], dtype=int32)]
[array([10, 11], dtype=int32)]
[array([12, 13], dtype=int32)]
[array([14, 15], dtype=int32)]
[array([16, 17], dtype=int32)]
[array([18, 19], dtype=int32)]
Run Code Online (Sandbox Code Playgroud)
从线程的列表,你可以看到有相应的阅读操作2个线程(fifo_queue_enqueue和fifo_queue_enqueue_1所以你可以做2并行读取)
| 归档时间: |
|
| 查看次数: |
1306 次 |
| 最近记录: |