doh*_*tob 5 python parallel-processing tensorflow
考虑以下张量流代码片段:
import time
import numpy as np
import tensorflow as tf
def fn(i):
# do some junk work
for _ in range(100):
i ** 2
return i
n = 1000
n_jobs = 8
stuff = np.arange(1, n + 1)
eager = False
t0 = time.time()
if eager:
tf.enable_eager_execution()
res = tf.map_fn(fn, stuff, parallel_iterations=n_jobs)
if not eager:
with tf.Session() as sess:
res = sess.run(res)
print(sum(res))
else:
print(sum(res))
dt = time.time() - t0
print("(eager=%s) Took %ims" % (eager, dt * 1000))
Run Code Online (Sandbox Code Playgroud)
如果使用eager = True它运行,则比使用 运行时慢 10 倍eager = False。我做了一些打印,发现在eager = True模式下,map_fn调用是顺序运行的,而不是生成 8 个并行线程。
所以我的问题是如何map_fn在急切执行模式下使用(并行迭代> 1)?
(我为此使用了 TF 2.3,不要指望新版本会产生相同的结果。)
\n这不仅仅是对OP问题的回答,这是它的延伸,说明了为什么其他答案没有解决真正的问题,因为tf.function不足以强制并行。
首先,使用tf.function并不强制并行化。它强制跟踪和图形的构建,这只发生一次,因此,time.sleep()其他答案中使用的仅在第一次需要跟踪时运行,这就是为什么您会看到tf.function. 但改变后你仍然看不出有什么不同parallel_iterations。
让我们使用一个py_fuction来看看差异:
def op(x):\n time.sleep(1)\n return 2 * x.numpy()\n\ndef op_tf(x):\n print(\'Tracing\')\n return tf.py_function(op, [x], Tout=tf.int32)\nRun Code Online (Sandbox Code Playgroud)\n不使用装饰器(或直接调用)tf.function任何调用op_tf将始终打印“Tracing”(尽管在本例中不是跟踪)
In [57]: op_tf(1)\nTracing\nOut[57]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [58]: op_tf(1)\nTracing\nOut[58]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\nRun Code Online (Sandbox Code Playgroud)\n和tf.function只看到一次跟踪(如果我们使用相同的参数):
In [57]: op_tf(1)\nTracing\nOut[57]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [58]: op_tf(1)\nTracing\nOut[58]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\nRun Code Online (Sandbox Code Playgroud)\n发生这种情况是因为该函数必须为每个新参数构建一个新图,如果我们直接传递签名,我们就可以避免这种情况的发生:
\nIn [67]: @tf.function\n ...: def op_tf(x):\n ...: print("Tracing")\n ...: return tf.py_function(op, [x], Tout=tf.int32)\n ...: \n\nIn [68]: op_tf(1)\nTracing\nOut[68]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [69]: op_tf(2)\nTracing\nOut[69]: <tf.Tensor: shape=(), dtype=int32, numpy=4>\n\nIn [70]: op_tf(3)\nTracing\nOut[70]: <tf.Tensor: shape=(), dtype=int32, numpy=6>\n\nIn [71]: op_tf(3)\nOut[71]: <tf.Tensor: shape=(), dtype=int32, numpy=6>\nRun Code Online (Sandbox Code Playgroud)\n如果我们首先调用该方法,也会发生同样的情况get_concrete_function:
In [73]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n ...: def op_tf(x):\n ...: print("Tracing")\n ...: return tf.py_function(op, [x], Tout=tf.int32)\n ...: \n ...: \n\nIn [74]: op_tf(1)\nTracing\nOut[74]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [75]: op_tf(2)\nOut[75]: <tf.Tensor: shape=(), dtype=int32, numpy=4>\n\nIn [76]: op_tf(3)\nOut[76]: <tf.Tensor: shape=(), dtype=int32, numpy=6>\nRun Code Online (Sandbox Code Playgroud)\n然后,答案声称只需添加tf.function并不完全正确:
In [79]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n ...: def op_tf(x):\n ...: print("Tracing")\n ...: return tf.py_function(op, [x], Tout=tf.int32)\n ...: \n ...: \n\nIn [80]: op_tf = op_tf.get_concrete_function()\nTracing\n\nIn [81]: op_tf(1)\nOut[81]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [82]: op_tf(2)\nOut[82]: <tf.Tensor: shape=(), dtype=int32, numpy=4>\n\nRun Code Online (Sandbox Code Playgroud)\n相比之下,如果用于 sleep 和 print 的 python 指令位于 py_function 内部,则它们将始终被调用:
\nIn [84]: def op(x):\n ...: print("sleep")\n ...: time.sleep(0.1)\n ...: return 1.\n ...: \n\nIn [85]: x = tf.ones(shape=(10,))\n\nIn [86]: _ = tf.map_fn(op, x, parallel_iterations=10)\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\n\nIn [87]: @tf.function\n ...: def my_map(*args, **kwargs):\n ...: return tf.map_fn(*args, **kwargs)\n ...: \n\nIn [88]: my_map(op, x, parallel_iterations=10)\nsleep\nOut[88]: <tf.Tensor: shape=(10,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], dtype=float32)>\nRun Code Online (Sandbox Code Playgroud)\n现在,我们已经清楚函数的跟踪给我们带来了一些困惑,让我们删除打印来计时调用:
\nIn [96]: x = tf.ones(shape=(10,), dtype=tf.int32)\n\nIn [97]: def op(x):\n ...: print("sleep")\n ...: time.sleep(0.1)\n ...: return 2 * x.numpy()\n ...: \n\nIn [98]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n ...: def op_tf(x):\n ...: print("Tracing")\n ...: return tf.py_function(op, [x], Tout=tf.int32)\n ...: \n\nIn [99]: _ = my_map(op_tf, x, parallel_iterations=1)\nTracing\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nRun Code Online (Sandbox Code Playgroud)\n运行以下脚本并使用张量板,我们可以准确地看到发生了什么:
\nIn [106]: def op(x):\n ...: time.sleep(0.1)\n ...: return 2 * x.numpy()\n ...: \n\nIn [107]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n ...: def op_tf(x):\n ...: return tf.py_function(op, [x], Tout=tf.int32)\n ...: \n\nIn [108]: %timeit tf.map_fn(op_tf, x, parallel_iterations=1)\n1.02 s \xc2\xb1 554 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n\nIn [109]: %timeit tf.map_fn(op_tf, x, parallel_iterations=10)\n1.03 s \xc2\xb1 509 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\nRun Code Online (Sandbox Code Playgroud)\n\n它py_function有效地使用了多个线程,但不是并行的。我们parallel_iterations=1得到了类似的东西\n
如果我们在脚本的开头添加以下内容
\nimport tensorflow as tf\nimport time\nfrom datetime import datetime\n\nstamp = datetime.now().strftime("%Y%m%d-%H%M%S")\nlogdir = \'logs/func/%s\' % stamp\n\n# Start tracing. \noptions = tf.profiler.experimental.ProfilerOptions(\n host_tracer_level=3, python_tracer_level=1, device_tracer_level=1, delay_ms=None\n)\n\ntf.profiler.experimental.start(logdir, options = options)\n\ndef op(x):\n x = x.numpy()\n start = time.time()\n\n while time.time() < start + x / 100:\n x = (2 * x) % 123\n\n return x\n\n@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])\ndef op_tf(x):\n return tf.py_function(op, [x], Tout=tf.int32, name=\'op\')\n\n@tf.function(input_signature=[tf.TensorSpec([None], tf.int32)])\ndef my_map(x):\n return tf.map_fn(op_tf, x, parallel_iterations=16)\n\nx = tf.ones(100, tf.int32)\nprint(my_map(x))\n\ntf.profiler.experimental.stop()\nRun Code Online (Sandbox Code Playgroud)\n\n因此,此时如果我们正确设置内部/内部线程,我们就只能获得某种形式的并行执行。
\n如果我们完全禁用急切执行:
\ntf.config.threading.set_inter_op_parallelism_threads(1)\ntf.config.threading.set_intra_op_parallelism_threads(1)\nRun Code Online (Sandbox Code Playgroud)\n我们现在可以在 Tensorboard 中看到并行执行:\n
如果我们将线程内/线程间和parallel_iterations设置为1,我们就会得到之前的行为:\n
我希望这有助于澄清在检查完全并行性中的作用tf.function。
| 归档时间: |
|
| 查看次数: |
3019 次 |
| 最近记录: |