在Flask中进行异步任务

Dar*_*ech 65 python asynchronous flask

我在Flask中编写了一个应用程序,除了WSGI同步和阻塞之外,它的工作原理非常好.我有一个特别的任务,它调用第三方API,该任务可能需要几分钟才能完成.我想拨打电话(实际上是一系列电话)并让它运行.控制权归还给Flask.

我的观点如下:

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    # do stuff
    return Response(
        mimetype='application/json',
        status=200
    )
Run Code Online (Sandbox Code Playgroud)

现在,我想做的就是拥有这条线

final_file = audio_class.render_audio()
Run Code Online (Sandbox Code Playgroud)

运行并提供在方法返回时执行的回调,而Flask可以继续处理请求.这是我需要Flask异步运行的唯一任务,我想就如何最好地实现它做一些建议.

我看过Twisted和Klein,但我不确定它们是否过度,因为Threading就足够了.或者也许Celery是一个不错的选择?

小智 74

我会使用Celery为您处理异步任务.您需要安装代理作为任务队列(建议使用RabbitMQ和Redis).

app.py:

from flask import Flask
from celery import Celery

broker_url = 'amqp://guest@localhost'          # Broker URL for RabbitMQ task queue

app = Flask(__name__)    
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig')      # Your celery configurations in a celeryconfig.py

@celery.task(bind=True)
def some_long_task(self, x, y):
    # Do some long task
    ...

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file = audio_class.render_audio(data=text_list)
    some_long_task.delay(x, y)                 # Call your async task and pass whatever necessary variables
    return Response(
        mimetype='application/json',
        status=200
    )
Run Code Online (Sandbox Code Playgroud)

运行Flask应用程序,然后启动另一个进程来运行芹菜工作者.

$ celery worker -A app.celery --loglevel=debug
Run Code Online (Sandbox Code Playgroud)

我还要参考Miguel Gringberg 撰写的关于使用Celery和Flask的更深入的指南.

  • Celery 是一个可靠的解决方案,但它不是一个轻量级的解决方案,并且需要一段时间来设置。 (2认同)
  • Celery 启动命令不再起作用。显然,`-A` 在版本 5.0 中作为参数被删除:`Celery 5.0 中删除了对此用法的支持` (2认同)

Tom*_*iak 28

您也可以尝试使用multiprocessing.Processwith daemon=True; 该process.start()方法不会阻塞,您可以在昂贵的函数在后台执行时立即向调用者返回响应/状态。

我在使用falcon框架和使用daemon过程帮助时遇到了类似的问题。

您需要执行以下操作:

from multiprocessing import Process

@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
    ...
    heavy_process = Process(  # Create a daemonic process with heavy "my_func"
        target=my_func,
        daemon=True
    )
    heavy_process.start()
    return Response(
        mimetype='application/json',
        status=200
    )

# Define some heavy function
def my_func():
    time.sleep(10)
    print("Process finished")
Run Code Online (Sandbox Code Playgroud)

您应该立即得到响应,10 秒后您应该会在控制台中看到打印的消息。

注意:请记住,daemonic不允许进程产生任何子进程。

  • 我不明白你的意思。作者谈论的是异步任务,这是“在后台”运行的任务,这样调用者在获得响应之前不会阻塞。生成守护进程是实现这种异步性的一个示例。 (6认同)
  • 例如,您可以使“my_func”将响应/心跳发送到其他端点。或者您可以建立并共享一些消息队列,通过它您可以与“my_func”进行通信 (2认同)

Jur*_*dom 11

Threading is another possible solution. Although the Celery based solution is better for applications at scale, if you are not expecting too much traffic on the endpoint in question, threading is a viable alternative.

This solution is based on Miguel Grinberg's PyCon 2016 Flask at Scale presentation, specifically slide 41 in his slide deck. His code is also available on github for those interested in the original source.

From a user perspective the code works as follows:

  1. You make a call to the endpoint that performs the long running task.
  2. This endpoint returns 202 Accepted with a link to check on the task status.
  3. Calls to the status link returns 202 while the taks is still running, and returns 200 (and the result) when the task is complete.

To convert an api call to a background task, simply add the @async_api decorator.

Here is a fully contained example:

from flask import Flask, g, abort, current_app, request, url_for
from werkzeug.exceptions import HTTPException, InternalServerError
from flask_restful import Resource, Api
from datetime import datetime
from functools import wraps
import threading
import time
import uuid

tasks = {}

app = Flask(__name__)
api = Api(app)


@app.before_first_request
def before_first_request():
    """Start a background thread that cleans up old tasks."""
    def clean_old_tasks():
        """
        This function cleans up old tasks from our in-memory data structure.
        """
        global tasks
        while True:
            # Only keep tasks that are running or that finished less than 5
            # minutes ago.
            five_min_ago = datetime.timestamp(datetime.utcnow()) - 5 * 60
            tasks = {task_id: task for task_id, task in tasks.items()
                     if 'completion_timestamp' not in task or task['completion_timestamp'] > five_min_ago}
            time.sleep(60)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=clean_old_tasks)
        thread.start()


def async_api(wrapped_function):
    @wraps(wrapped_function)
    def new_function(*args, **kwargs):
        def task_call(flask_app, environ):
            # Create a request context similar to that of the original request
            # so that the task can have access to flask.g, flask.request, etc.
            with flask_app.request_context(environ):
                try:
                    tasks[task_id]['return_value'] = wrapped_function(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['return_value'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['return_value'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise
                finally:
                    # We record the time of the response, to help in garbage
                    # collecting old tasks
                    tasks[task_id]['completion_timestamp'] = datetime.timestamp(datetime.utcnow())

                    # close the database session (if any)

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task_thread': threading.Thread(
            target=task_call, args=(current_app._get_current_object(),
                               request.environ))}
        tasks[task_id]['task_thread'].start()

        # Return a 202 response, with a link that the client can use to
        # obtain task status
        print(url_for('gettaskstatus', task_id=task_id))
        return 'accepted', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
    return new_function


class GetTaskStatus(Resource):
    def get(self, task_id):
        """
        Return status about an asynchronous task. If this request returns a 202
        status code, it means that task hasn't finished yet. Else, the response
        from the task is returned.
        """
        task = tasks.get(task_id)
        if task is None:
            abort(404)
        if 'return_value' not in task:
            return '', 202, {'Location': url_for('gettaskstatus', task_id=task_id)}
        return task['return_value']


class CatchAll(Resource):
    @async_api
    def get(self, path=''):
        # perform some intensive processing
        print("starting processing task, path: '%s'" % path)
        time.sleep(10)
        print("completed processing task, path: '%s'" % path)
        return f'The answer is: {path}'


api.add_resource(CatchAll, '/<path:path>', '/')
api.add_resource(GetTaskStatus, '/status/<task_id>')


if __name__ == '__main__':
    app.run(debug=True)

Run Code Online (Sandbox Code Playgroud)


ana*_*thi 8

烧瓶 2.0

Flask 2.0 现在支持异步路由。您可以使用 httpx 库并为此使用 asyncio 协程。您可以像下面一样更改代码

@app.route('/render/<id>', methods=['POST'])
async def render_script(id=None):
    ...
    data = json.loads(request.data)
    text_list = data.get('text_list')
    final_file =  await asyncio.gather(
        audio_class.render_audio(data=text_list),
        do_other_stuff_function()
    )
    # Just make sure that the coroutine should not  having any blocking calls inside it. 
    return Response(
        mimetype='application/json',
        status=200
    )
Run Code Online (Sandbox Code Playgroud)

上面的只是一个伪代码,但您可以查看 asyncio 如何与flask 2.0 一起工作,对于 HTTP 调用,您可以使用 httpx。还要确保协程只执行一些 I/O 任务。

  • 虽然您关于 Flask &gt;=2.0 中“async”支持的声明是正确的,但问题提出者的问题无法用这种方法解决,因为 Flask 不支持后台任务(他们也[指出](https://flask.palletsprojects .com/en/2.0.x/async-await/#background-tasks)在他们的文档中)。 (5认同)
  • 还有 Quart 遵循与 Flask 相同的结构。每当您想充分利用异步代码的潜力时,建议使用 Quart。烧瓶:https://flask.palletsprojects.com/en/2.0.x/async-await/ Quart:https://pgjones.gitlab.io/quart/index.html (2认同)