凯拉斯预测芹菜任务不归队

pX0*_*X0r 5 django redis celery keras tensorflow

同步调用时,遵循Keras函数(预测)工作

pred = model.predict(x)
Run Code Online (Sandbox Code Playgroud)

但是,从异步任务队列(Celery)中调用时,它不起作用。Keras预测函数在异步调用时不返回任何输出。

堆栈是:Django,Celery,Redis,Keras,TensorFlow

小智 13

我遇到了这个完全相同的问题,伙计,这是一个兔子洞。想在这里发布我的解决方案,因为它可能会为某人节省一天的工作:

TensorFlow 线程特定的数据结构

在 TensorFlow 中,当您调用model.predict(或keras.models.load_model、 或keras.backend.clear_session或几乎任何其他与 TensorFlow 后端交互的函数)时,有两个关键数据结构在幕后工作:

在没有深入挖掘的情况下,文档中没有明确说明的是session 和 graph 都是当前线程的属性。请参阅此处此处的API 文档。

在不同线程中使用 TensorFlow 模型

想要加载一次模型然后.predict()多次调用它是很自然的:

from keras.models import load_model

MY_MODEL = load_model('path/to/model/file')

def some_worker_function(inputs):
    return MY_MODEL.predict(inputs)

Run Code Online (Sandbox Code Playgroud)

在像 Celery 这样的网络服务器或工作池上下文中,这意味着您将在导入包含该load_model行的模块时加载模型,然后将执行不同的线程some_worker_function,在包含 Keras 模型的全局变量上运行 predict 。但是,尝试在不同线程中加载的模型上运行 predict 会产生“张量不是该图的元素”错误。感谢几篇触及该主题的 SO 帖子,例如ValueError: Tensor Tensor(...) is not an element of this graph。使用全局变量 keras 模型时。为了让它工作,你需要坚持使用的 TensorFlow 图——正如我们之前看到的,图是当前线程的一个属性。更新后的代码如下所示:

from keras.models import load_model
import tensorflow as tf

MY_MODEL = load_model('path/to/model/file')
MY_GRAPH = tf.get_default_graph()

def some_worker_function(inputs):
    with MY_GRAPH.as_default():
        return MY_MODEL.predict(inputs)
Run Code Online (Sandbox Code Playgroud)

这里有点令人惊讶的转折是:如果您使用Threads,上面的代码就足够了,但如果您使用Processes ,则无限期地挂起。默认情况下,Celery 使用进程来管理其所有工作池。所以在这一点上,Celery 的事情仍然没有奏效。

为什么这只适用于Threads?

在 Python 中,Threads 与父进程共享相同的全局执行上下文。来自Python _thread 文档

该模块提供用于处理多个线程(也称为轻量级进程或任务)的低级原语——多个控制线程共享其全局数据空间。

由于线程不是实际的独立进程,它们使用相同的 Python 解释器,因此受到臭名昭著的全局互操作者锁 (GIL) 的约束。也许对于本次调查更重要的是,他们与父级共享全局数据空间。

与此相反,Processes 是由程序产生的实际新进程。这意味着:

  • 新的 Python 解释器实例(没有 GIL)
  • 全局地址空间重复

注意这里的区别。虽然Threads 可以访问共享的单个全局 Session 变量(存储在tensorflow_backendKeras 模块内部),但Processes 具有 Session 变量的副本。

我对这个问题最好的理解是 Session 变量应该代表客户端(进程)和 TensorFlow 运行时之间的唯一连接,但是由于在分叉过程中被复制,这个连接信息没有得到适当的调整。这会导致 TensorFlow 在尝试使用在不同进程中创建的会话时挂起。如果有人更深入地了解 TensorFlow 的幕后工作原理,我很想听听!

解决方案/变通方法

我调整了 Celery,使其使用Threads 而不是Processes 进行池化。这种方法有一些缺点(参见上面的 GIL 注释),但这允许我们只加载一次模型。无论如何,我们并不是真正受 CPU 限制,因为 TensorFlow 运行时最大化了所有 CPU 内核(它可以避开 GIL,因为它不是用 Python 编写的)。你必须为 Celery 提供一个单独的库来进行基于线程的池化;文档建议了两个选项:geventeventlet。然后您通过--pool命令行参数将您选择的库传递给工作程序。

或者,似乎(正如您已经发现@pX0r)其他 Keras 后端(例如 Theano)没有此问题。这是有道理的,因为这些问题与 TensorFlow 实现细节密切相关。我个人还没有尝试过 Theano,所以你的里程可能会有所不同。

我知道这个问题是前一段时间发布的,但问题仍然存在,所以希望这会对某人有所帮助!