启用急切执行时如何运行并行map_fn

doh*_*tob 5 python parallel-processing tensorflow

考虑以下张量流代码片段:

import time
import numpy as np
import tensorflow as tf

def fn(i):
    # do some junk work
    for _ in range(100):
        i ** 2
    return i

n = 1000
n_jobs = 8
stuff = np.arange(1, n + 1)
eager = False
t0 = time.time()
if eager:
    tf.enable_eager_execution()
res = tf.map_fn(fn, stuff, parallel_iterations=n_jobs)
if not eager:
    with tf.Session() as sess:
        res = sess.run(res)
        print(sum(res))
else:
    print(sum(res))
dt = time.time() - t0
print("(eager=%s) Took %ims" % (eager, dt * 1000))
Run Code Online (Sandbox Code Playgroud)

如果使用eager = True它运行,则比使用 运行时慢 10 倍eager = False。我做了一些打印,发现在eager = True模式下,map_fn调用是顺序运行的,而不是生成 8 个并行线程。

问题

所以我的问题是如何map_fn在急切执行模式下使用(并行迭代> 1)?

Jor*_*ona 4

(我为此使用了 TF 2.3,不要指望新版本会产生相同的结果。)

\n

这不仅仅是对OP问题的回答,这是它的延伸,说明了为什么其他答案没有解决真正的问题,因为tf.function不足以强制并行。

\n
\n

首先,使用tf.function并不强制并行化。它强制跟踪和图形的构建,这只发生一次,因此,time.sleep()其他答案中使用的仅在第一次需要跟踪时运行,这就是为什么您会看到tf.function. 但改变后你仍然看不出有什么不同parallel_iterations

\n

让我们使用一个py_fuction来看看差异:

\n
def op(x):\n    time.sleep(1)\n    return 2 * x.numpy()\n\ndef op_tf(x):\n    print(\'Tracing\')\n    return tf.py_function(op, [x], Tout=tf.int32)\n
Run Code Online (Sandbox Code Playgroud)\n

不使用装饰器(或直接调用)tf.function任何调用op_tf将始终打印“Tracing”(尽管在本例中不是跟踪)

\n
In [57]: op_tf(1)\nTracing\nOut[57]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [58]: op_tf(1)\nTracing\nOut[58]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n
Run Code Online (Sandbox Code Playgroud)\n

tf.function只看到一次跟踪(如果我们使用相同的参数):

\n
In [57]: op_tf(1)\nTracing\nOut[57]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [58]: op_tf(1)\nTracing\nOut[58]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n
Run Code Online (Sandbox Code Playgroud)\n

发生这种情况是因为该函数必须为每个新参数构建一个新图,如果我们直接传递签名,我们就可以避免这种情况的发生:

\n
In [67]: @tf.function\n    ...: def op_tf(x):\n    ...:     print("Tracing")\n    ...:     return tf.py_function(op, [x], Tout=tf.int32)\n    ...: \n\nIn [68]: op_tf(1)\nTracing\nOut[68]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [69]: op_tf(2)\nTracing\nOut[69]: <tf.Tensor: shape=(), dtype=int32, numpy=4>\n\nIn [70]: op_tf(3)\nTracing\nOut[70]: <tf.Tensor: shape=(), dtype=int32, numpy=6>\n\nIn [71]: op_tf(3)\nOut[71]: <tf.Tensor: shape=(), dtype=int32, numpy=6>\n
Run Code Online (Sandbox Code Playgroud)\n

如果我们首先调用该方法,也会发生同样的情况get_concrete_function

\n
In [73]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n    ...: def op_tf(x):\n    ...:     print("Tracing")\n    ...:     return tf.py_function(op, [x], Tout=tf.int32)\n    ...: \n    ...: \n\nIn [74]: op_tf(1)\nTracing\nOut[74]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [75]: op_tf(2)\nOut[75]: <tf.Tensor: shape=(), dtype=int32, numpy=4>\n\nIn [76]: op_tf(3)\nOut[76]: <tf.Tensor: shape=(), dtype=int32, numpy=6>\n
Run Code Online (Sandbox Code Playgroud)\n

然后,答案声称只需添加tf.function并不完全正确:

\n
In [79]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n    ...: def op_tf(x):\n    ...:     print("Tracing")\n    ...:     return tf.py_function(op, [x], Tout=tf.int32)\n    ...: \n    ...: \n\nIn [80]: op_tf = op_tf.get_concrete_function()\nTracing\n\nIn [81]: op_tf(1)\nOut[81]: <tf.Tensor: shape=(), dtype=int32, numpy=2>\n\nIn [82]: op_tf(2)\nOut[82]: <tf.Tensor: shape=(), dtype=int32, numpy=4>\n\n
Run Code Online (Sandbox Code Playgroud)\n

相比之下,如果用于 sleep 和 print 的 python 指令位于 py_function 内部,则它们将始终被调用:

\n
In [84]: def op(x):\n    ...:     print("sleep")\n    ...:     time.sleep(0.1)\n    ...:     return 1.\n    ...: \n\nIn [85]: x = tf.ones(shape=(10,))\n\nIn [86]: _ = tf.map_fn(op, x, parallel_iterations=10)\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\n\nIn [87]: @tf.function\n    ...: def my_map(*args, **kwargs):\n    ...:     return tf.map_fn(*args, **kwargs)\n    ...: \n\nIn [88]: my_map(op, x, parallel_iterations=10)\nsleep\nOut[88]: <tf.Tensor: shape=(10,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], dtype=float32)>\n
Run Code Online (Sandbox Code Playgroud)\n

现在,我们已经清楚函数的跟踪给我们带来了一些困惑,让我们删除打印来计时调用:

\n
In [96]: x = tf.ones(shape=(10,), dtype=tf.int32)\n\nIn [97]: def op(x):\n    ...:     print("sleep")\n    ...:     time.sleep(0.1)\n    ...:     return 2 * x.numpy()\n    ...: \n\nIn [98]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n    ...: def op_tf(x):\n    ...:     print("Tracing")\n    ...:     return tf.py_function(op, [x], Tout=tf.int32)\n    ...: \n\nIn [99]: _ = my_map(op_tf, x, parallel_iterations=1)\nTracing\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\nsleep\n
Run Code Online (Sandbox Code Playgroud)\n

运行以下脚本并使用张量板,我们可以准确地看到发生了什么:

\n
In [106]: def op(x):\n     ...:     time.sleep(0.1)\n     ...:     return 2 * x.numpy()\n     ...: \n\nIn [107]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])\n     ...: def op_tf(x):\n     ...:     return tf.py_function(op, [x], Tout=tf.int32)\n     ...: \n\nIn [108]: %timeit tf.map_fn(op_tf, x, parallel_iterations=1)\n1.02 s \xc2\xb1 554 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n\nIn [109]: %timeit tf.map_fn(op_tf, x, parallel_iterations=10)\n1.03 s \xc2\xb1 509 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 1 loop each)\n
Run Code Online (Sandbox Code Playgroud)\n

我们在 Tensorboard 中得到以下内容:\n在此输入图像描述

\n

py_function有效地使用了多个线程,但不是并行的。我们parallel_iterations=1得到了类似的东西\n在此输入图像描述

\n

如果我们在脚本的开头添加以下内容

\n
import tensorflow as tf\nimport time\nfrom datetime import datetime\n\nstamp = datetime.now().strftime("%Y%m%d-%H%M%S")\nlogdir = \'logs/func/%s\' % stamp\n\n# Start tracing.                                                                                                                                                                              \noptions = tf.profiler.experimental.ProfilerOptions(\n    host_tracer_level=3, python_tracer_level=1, device_tracer_level=1, delay_ms=None\n)\n\ntf.profiler.experimental.start(logdir, options = options)\n\ndef op(x):\n    x = x.numpy()\n    start = time.time()\n\n    while time.time() < start + x / 100:\n        x = (2 * x) % 123\n\n    return x\n\n@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])\ndef op_tf(x):\n    return tf.py_function(op, [x], Tout=tf.int32, name=\'op\')\n\n@tf.function(input_signature=[tf.TensorSpec([None], tf.int32)])\ndef my_map(x):\n    return tf.map_fn(op_tf, x, parallel_iterations=16)\n\nx = tf.ones(100, tf.int32)\nprint(my_map(x))\n\ntf.profiler.experimental.stop()\n
Run Code Online (Sandbox Code Playgroud)\n

我们强制 TF 使用单个线程来进行所有图形计算:\n在此输入图像描述

\n

因此,此时如果我们正确设置内部/内部线程,我们就只能获得某种形式的并行执行。

\n

如果我们完全禁用急切执行:

\n
tf.config.threading.set_inter_op_parallelism_threads(1)\ntf.config.threading.set_intra_op_parallelism_threads(1)\n
Run Code Online (Sandbox Code Playgroud)\n

我们现在可以在 Tensorboard 中看到并行执行:\n在此输入图像描述

\n

如果我们将线程内/线程间和parallel_iterations设置为1,我们就会得到之前的行为:\n在此输入图像描述

\n

我希望这有助于澄清在检查完全并行性中的作用tf.function

\n