来自Flask路线的Python3 Asyncio调用

use*_*502 30 event-loop flask python-3.x python-asyncio

我想在每次执行flask路径时执行异步函数.目前我的abar函数从未执行过.你能告诉我为什么吗?非常感谢你:

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=loop)
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

我也尝试将一个阻塞调用放在一个单独的线程中.但它仍然没有称之为abar功能.

import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
    print(a)

app = Flask(__name__)

def start_worker(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)
    return "OK"

if __name__ == "__main__":
    worker.start()
    app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud)

Mar*_*ers 22

你的错误是在调用app.run(). 后者不会返回,而是运行 Flask 开发服务器。

事实上,这就是大多数 WSGI 设置的工作方式;无论是主线程是要忙调度请求,或瓶服务器导入为一个WSGI服务器的模块,并且你不能从这里开始的事件循环两种

相反,您必须在单独的线程中运行 asyncio 事件循环,然后通过asyncio.run_coroutine_threadsafe(). 请参阅文档中的协程和多线程部分了解这意味着什么。

这是一个模块的实现,该模块将运行这样一个事件循环线程,并为您提供安排协程在该循环中运行的实用程序:

import asyncio
import itertools
import threading

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        self.started = threading.Event()
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.call_later(0, self.started.set)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            try:
                shutdown_executor = loop.shutdown_default_executor()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_executor)
            asyncio.set_event_loop(None)
            loop.close()

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread

    if _loop_thread is None:
        with _lock:
            if _loop_thread is None:
                _loop_thread = EventLoopThread()
                _loop_thread.start()
                # give the thread up to a second to produce a loop
                _loop_thread.started.wait(1)

    return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    """Run the coroutine in the event loop running in a separate thread

    Returns a Future, call Future.result() to get the output

    """
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
Run Code Online (Sandbox Code Playgroud)

您可以使用run_coroutine()此处定义的函数来安排异步例程。使用返回的Future实例来控制协程:

  • 得到结果Future.result()。你可以给它一个超时时间;如果超时内没有产生任何结果,协程将自动取消。
  • 您可以使用.cancelled(),.running().done()方法查询协程的状态。
  • 您可以向未来添加回调,它将在协程完成、取消或引发异常时调用(考虑到这可能会从事件循环线程而不是您调用的线程中调用run_coroutine()) )。

对于您的特定示例, whereabar()不返回任何结果,您可以忽略返回的未来,如下所示:

@app.route("/")
def notify():
    run_coroutine(abar("abar"))
    return "OK"
Run Code Online (Sandbox Code Playgroud)

请注意,在 Python 3.8 之前,您不能使用在单独线程上运行的事件循环来创建子进程!请参阅我对Python3 Flask asyncio subprocess in route hangs for backport of the Python 3.8 ThreadedChildWatcherclass 的回答,以解决此问题。


Tra*_*rry 20

您可以将一些异步功能合并到Flask应用程序中,而无需将它们完全转换为asyncio.

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    loop.run_until_complete(abar("abar"))
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud)

这将阻止Flask响应,直到异步函数返回,但它仍然允许你做一些聪明的事情.我已经使用这种模式使用aiohttp并行执行许多外部请求,然后当它们完成时,我又回到传统的瓶子中进行数据处理和模板渲染.

import aiohttp
import asyncio
import async_timeout
from flask import Flask

loop = asyncio.get_event_loop()
app = Flask(__name__)

async def fetch(url):
    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()

def fight(responses):
    return "Why can't we all just get along?"

@app.route("/")
def index():
    # perform multiple async requests concurrently
    responses = loop.run_until_complete(asyncio.gather(
        fetch("https://google.com/"),
        fetch("https://bing.com/"),
        fetch("https://duckduckgo.com"),
        fetch("http://www.dogpile.com"),
    ))

    # do something with the results
    return fight(responses)

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud)

  • 这不是线程安全的,您不能简单地从任意线程使用“loop.run_until_complete()”。异步循环是*线程特定的*。任何现实生活中的 WSGI 部署都将使用线程。您必须为*每个线程*创建一个新的事件循环,而不是调用 `asyncio.get_event_loop()`。但这……太过分了。 (8认同)
  • 您的示例给出了“ RuntimeError:线程'Thread-1'中没有当前事件循环”。复制:1)我已将您的代码段保存到soexamp.py;2)运行python soexamp.py 3)比`curl localhost:5000 /` 我的flask .__ version__是'1.0.2'和aiohttp .__ version__是'3.5.4'。 (6认同)
  • 由于在生产中使用 async worker(例如 gevent、meinheld 或 eventlet)运行 Flask 是很常见的,我认为重要的是要注意这个解决方案会阻塞 gevent/meinheld/eventlet 事件循环。这反过来会否定使用它们的一些优势。 (3认同)
  • @ravimalhotra:线程不安全意味着事情可能会中断,因为多个线程正在更改相同的数据结构,除非您考虑线程。除了一些[明确记录的函数](https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading) 之外,异步事件循环实现不是线程安全的。这里的代码*不会*为每个线程创建一个新的事件循环,也不会正确地将协程传递给单个线程。请注意,我还发布了此问题的答案,可以更好地解决这些问题。 (3认同)

pgj*_*nes 15

一个更简单的解决方案(在我的偏见视图中)是从Flask 切换到Quart.如果是这样,您的代码段简化为,

import asyncio
from quart import Quart

async def abar(a):
    print(a)

app = Quart(__name__)

@app.route("/")
async def notify():
    await abar("abar")
    return "OK"

if __name__ == "__main__":
    app.run(debug=False)
Run Code Online (Sandbox Code Playgroud)

如其他答案中所述,Flask app运行是阻塞的,并且不与asyncio循环交互.另一方面,Quart是基于asyncio构建的Flask API,所以它应该如你所愿.

另外,作为更新,不再维护 Flask-Aiohttp .

  • 我很抱歉投了反对票,但是当您希望能够触发后台任务时告诉您切换整个框架的答案并没有真正的帮助 (2认同)

Mik*_*mov 5

出于同样的原因,您将看不到此打印:

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    print('Hey!')
    loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

loop.run_forever()永远不会被调用,因为已经指出的@dirn app.run也会阻塞。

运行全局阻止事件循环-这是您可以运行asyncio协程和任务的唯一方法,但它与运行阻止Flask应用程序(或通常与任何其他此类应用程序)不兼容。

如果要使用异步Web框架,则应选择一个异步创建的框架。例如,现在可能最受欢迎的是aiohttp

from aiohttp import web


async def hello(request):
    return web.Response(text="Hello, world")


if __name__ == "__main__":
    app = web.Application()
    app.router.add_get('/', hello)
    web.run_app(app)  # this runs asyncio event loop inside
Run Code Online (Sandbox Code Playgroud)

更新:

关于您尝试在后台线程中运行事件循环的信息。我没有做太多研究,但是似乎与胎面安全性有某种关系:许多异步对象不是线程安全的。如果您以此方式更改代码,它将起作用:

def _create_task():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)

@app.route("/")
def notify():
    worker_loop.call_soon_threadsafe(_create_task)
    return "OK"
Run Code Online (Sandbox Code Playgroud)

但是,这又是一个坏主意。这不仅很不方便,而且我想也没有多大意义:如果您要使用线程来启动异步,为什么不只在Flask中使用线程而不是异步呢?您将拥有所需的Flask和并行化。

如果我仍然不能说服您,请至少看看Flask-aiohttp项目。它与Flask api十分接近,我想您尝试做的还要更好。