在烧瓶中返回立即响应,但在新线程中完成处理

Chu*_*lar 2 python multithreading flask

我有一个从 10 秒开始的过程,但是在 10 秒之后,我不需要将信息返回给用户而是将其写在 dynamoDB 上,这就是为什么我希望用户不必等待 10 秒。相反,我希望在发布请求后立即得到“成功”响应。

我读了几篇文章,在这篇文章中,答案是使用 Teardown Callback,但没有示例。

我读了这个,但这对我的问题没有帮助。

我当然阅读了拆卸回调这种模式,但我不知道如何以另一种方式使用它。

我的代码如下所示:

@app.route('/ocr/read_image', methods=['POST'])
def get_text():    
    return jsonify('Success')

@app.teardown_request
def teardown_request(response):        
    time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

它实际上在 10 秒后返回“成功”消息。

有没有办法在 10 秒之前返回“成功”消息?

我一直在读到芹菜可能是可行的,但如果可以的话,我很想避免它。

有谁知道怎么做?

Mat*_*ory 5

线程在这里也不必要地使事情复杂化,恕我直言。要在不引入线程的情况下完成您想做的事情,您可以挂钩WSGI 响应关闭方法。最简单的方法是使用 werkzeug ClosingIterator助手。

import traceback
from werkzeug.wsgi import ClosingIterator

class AfterThisResponse:
    def __init__(self, app=None):
        self.callbacks = []
        if app:
            self.init_app(app)

    def __call__(self, callback):
        self.callbacks.append(callback)
        return callback

    def init_app(self, app):
        # install extensioe
        app.after_this_response = self

        # install middleware
        app.wsgi_app = AfterThisResponseMiddleware(app.wsgi_app, self)

    def flush(self):
        try:
            for fn in self.callbacks:
                try:
                    fn()
                except Exception:
                    traceback.print_exc()
        finally:
            self.callbacks = []

class AfterThisResponseMiddleware:
    def __init__(self, application, after_this_response_ext):
        self.application = application
        self.after_this_response_ext = after_this_response_ext

    def __call__(self, environ, start_response):
        iterator = self.application(environ, start_response)
        try:
            return ClosingIterator(iterator, [self.after_this_response_ext.flush])
        except Exception:
            traceback.print_exc()
            return iterator
Run Code Online (Sandbox Code Playgroud)

然后你可以像这样使用这个扩展:

import flask
import time
app = flask.Flask("after_response")
AfterThisResponse(app)

@app.route("/")
def home():
    @app.after_this_response
    def post_process():
        time.sleep(2)
        print("after_response")

    return "Success!\n"
Run Code Online (Sandbox Code Playgroud)

当您 curl 时,您将立即看到成功,然后在 2 秒后在您的日志中看到您的“after_response”消息:

127.0.0.1 - - [25/Jun/2018 16:15:01] "GET / HTTP/1.1" 200 -
after_response
Run Code Online (Sandbox Code Playgroud)

此解决方案是根据我的回答改编的摘要:


Chu*_*lar 3

正如 Ardaglio 所说,最好的方法是使用多线程。

我没有使用 Celery,因为我认为它非常复杂,而我的问题对它来说很简单。

所以,我正在使用线程:

from threading import Thread

@app.route('/ocr/read_image', methods=['POST'])
def get_text():    
    Thread(target=continue_processing).start()
    return jsonify('Success')

def continue_processing():
    time.sleep(10)
    print('Hi')
Run Code Online (Sandbox Code Playgroud)

但是,你必须小心。我使用 Keras 和 Tensorflow 作为后端,如果你这样使用它,你会得到一个很好的值错误ValueError: Tensor Tensor()is not an element of this graph.

因此,为了避免在线程中出现这种情况,您必须在模型创建后保存图表:

GRAPH = tf.get_default_graph()

然后你必须以这种方式在异步进程中使用它:

with GRAPH.as_default():
    do something with your model 
Run Code Online (Sandbox Code Playgroud)

希望它可以帮助别人。