用于 ML 预测的 Celery 任务在执行中挂起

mat*_*tox 6 machine-learning redis celery flask tensorflow

我正在尝试创建一个 Web 应用程序,该应用程序接收来自 POST 请求的输入,并根据该输入提供一些 ML 预测。

由于预测模型相当繁重,我不希望用户等待计算完成。相反,我将大量计算委托给 Celery 任务,用户可以稍后检查结果。

我正在使用带有 Celery、Redis 和 Flower 的简单 Flask 应用程序。

我的view.py

@ns.route('predict/')
class Predict(Resource):
    ...
    def post(self):
        ...
        do_categorize(data)
        return jsonify(success=True)
Run Code Online (Sandbox Code Playgroud)

我的tasks.py文件看起来像这样:

from ai.categorizer import Categorizer
categorizer = Categorizer(
    model_path='category_model.h5',
    tokenizer_path='tokenize.joblib',
    labels_path='labels.joblib'
)


@task()
def do_categorize(data):
    result = categorizer.predict(data)
    print(result)
    # Write result to the DB
    ...
Run Code Online (Sandbox Code Playgroud)

我在课堂predict()上的方法Categorizer

def predict(self, value):
    K.set_session(self.sess)
    with self.sess.as_default():
        with self.graph.as_default():
            prediction = self.model.predict(np.asarray([value], dtype='int64'))
            return prediction
Run Code Online (Sandbox Code Playgroud)

我像这样运行芹菜:

celery worker -A app.celery --loglevel=DEBUG
Run Code Online (Sandbox Code Playgroud)

最近几天我遇到的问题是调用categorizer.predict(data)在执行过程中挂起。

categorizer.predict(data)我尝试在 post 方法内部运行并且它有效。但如果我将它放在 Celery 任务中,它就会停止工作。没有控制台日志,如果我尝试调试它,它就会冻结.predict()

我的问题:

  • 我该如何解决这个问题?
  • Worker 是否有内存、CPU 限制?
  • Celery 任务是进行如此繁重计算的“正确”方法吗?
  • 我该如何调试这个问题?我究竟做错了什么?
  • 在文件顶部初始化模型是否正确?

mat*_*tox 8

感谢这个问题,我找到了问题的答案:

\n\n

事实证明,对于 Keras 来说,使用Threads池化比默认使用更好Process

\n\n

对我来说幸运的是,Celery 4.4Threads池化在不久前被重新引入。\n您可以在Celery 4.4 变更日志中阅读更多内容:

\n\n
\n

线程任务池

\n\n

我们使用并发.futures.ThreadPoolExecutor 重新引入了线程任务池。

\n\n

之前的线程任务池是实验性的。此外,它基于已过时的线程池包。

\n\n

您可以通过将worker_pool设置为\xe2\x80\x98threads`或将\xe2\x80\x93pool线程传递给celery工作命令来使用新的线程任务池。

\n
\n\n

现在您可以使用线程而不是进程进行池化。

\n\n
celery worker -A your_application --pool threads --loginfo=INFO\n
Run Code Online (Sandbox Code Playgroud)\n\n

如果你不能使用最新的 Celery 版本,你可以使用gevent包:

\n\n
pip install gevent\ncelery worker -A your_application --pool gevent --loginfo=INFO\n
Run Code Online (Sandbox Code Playgroud)\n