use*_*502 30 event-loop flask python-3.x python-asyncio
我想在每次执行flask路径时执行异步函数.目前我的abar函数从未执行过.你能告诉我为什么吗?非常感谢你:
import asyncio
from flask import Flask
async def abar(a):
print(a)
loop = asyncio.get_event_loop()
app = Flask(__name__)
@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=loop)
return "OK"
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我也尝试将一个阻塞调用放在一个单独的线程中.但它仍然没有称之为abar功能.
import asyncio
from threading import Thread
from flask import Flask
async def abar(a):
print(a)
app = Flask(__name__)
def start_worker(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()
worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))
@app.route("/")
def notify():
asyncio.ensure_future(abar("abar"), loop=worker_loop)
return "OK"
if __name__ == "__main__":
worker.start()
app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud)
Mar*_*ers 22
你的错误是在调用app.run(). 后者不会返回,而是运行 Flask 开发服务器。
事实上,这就是大多数 WSGI 设置的工作方式;无论是主线程是要忙调度请求,或瓶服务器导入为一个WSGI服务器的模块,并且你不能从这里开始的事件循环两种。
相反,您必须在单独的线程中运行 asyncio 事件循环,然后通过asyncio.run_coroutine_threadsafe(). 请参阅文档中的协程和多线程部分了解这意味着什么。
这是一个模块的实现,该模块将运行这样一个事件循环线程,并为您提供安排协程在该循环中运行的实用程序:
import asyncio
import itertools
import threading
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
self.started = threading.Event()
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.call_later(0, self.started.set)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
try:
shutdown_executor = loop.shutdown_default_executor()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_executor)
asyncio.set_event_loop(None)
loop.close()
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
if _loop_thread is None:
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
# give the thread up to a second to produce a loop
_loop_thread.started.wait(1)
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
"""Run the coroutine in the event loop running in a separate thread
Returns a Future, call Future.result() to get the output
"""
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
Run Code Online (Sandbox Code Playgroud)
您可以使用run_coroutine()此处定义的函数来安排异步例程。使用返回的Future实例来控制协程:
Future.result()。你可以给它一个超时时间;如果超时内没有产生任何结果,协程将自动取消。.cancelled(),.running()和.done()方法查询协程的状态。run_coroutine()) )。对于您的特定示例, whereabar()不返回任何结果,您可以忽略返回的未来,如下所示:
@app.route("/")
def notify():
run_coroutine(abar("abar"))
return "OK"
Run Code Online (Sandbox Code Playgroud)
请注意,在 Python 3.8 之前,您不能使用在单独线程上运行的事件循环来创建子进程!请参阅我对Python3 Flask asyncio subprocess in route hangs for backport of the Python 3.8 ThreadedChildWatcherclass 的回答,以解决此问题。
Tra*_*rry 20
您可以将一些异步功能合并到Flask应用程序中,而无需将它们完全转换为asyncio.
import asyncio
from flask import Flask
async def abar(a):
print(a)
loop = asyncio.get_event_loop()
app = Flask(__name__)
@app.route("/")
def notify():
loop.run_until_complete(abar("abar"))
return "OK"
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud)
这将阻止Flask响应,直到异步函数返回,但它仍然允许你做一些聪明的事情.我已经使用这种模式使用aiohttp并行执行许多外部请求,然后当它们完成时,我又回到传统的瓶子中进行数据处理和模板渲染.
import aiohttp
import asyncio
import async_timeout
from flask import Flask
loop = asyncio.get_event_loop()
app = Flask(__name__)
async def fetch(url):
async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()
def fight(responses):
return "Why can't we all just get along?"
@app.route("/")
def index():
# perform multiple async requests concurrently
responses = loop.run_until_complete(asyncio.gather(
fetch("https://google.com/"),
fetch("https://bing.com/"),
fetch("https://duckduckgo.com"),
fetch("http://www.dogpile.com"),
))
# do something with the results
return fight(responses)
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
Run Code Online (Sandbox Code Playgroud)
pgj*_*nes 15
一个更简单的解决方案(在我的偏见视图中)是从Flask 切换到Quart.如果是这样,您的代码段简化为,
import asyncio
from quart import Quart
async def abar(a):
print(a)
app = Quart(__name__)
@app.route("/")
async def notify():
await abar("abar")
return "OK"
if __name__ == "__main__":
app.run(debug=False)
Run Code Online (Sandbox Code Playgroud)
如其他答案中所述,Flask app运行是阻塞的,并且不与asyncio循环交互.另一方面,Quart是基于asyncio构建的Flask API,所以它应该如你所愿.
另外,作为更新,不再维护 Flask-Aiohttp .
出于同样的原因,您将看不到此打印:
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
print('Hey!')
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
loop.run_forever()永远不会被调用,因为已经指出的@dirn app.run也会阻塞。
运行全局阻止事件循环-这是您可以运行asyncio协程和任务的唯一方法,但它与运行阻止Flask应用程序(或通常与任何其他此类应用程序)不兼容。
如果要使用异步Web框架,则应选择一个异步创建的框架。例如,现在可能最受欢迎的是aiohttp:
from aiohttp import web
async def hello(request):
return web.Response(text="Hello, world")
if __name__ == "__main__":
app = web.Application()
app.router.add_get('/', hello)
web.run_app(app) # this runs asyncio event loop inside
Run Code Online (Sandbox Code Playgroud)
更新:
关于您尝试在后台线程中运行事件循环的信息。我没有做太多研究,但是似乎与胎面安全性有某种关系:许多异步对象不是线程安全的。如果您以此方式更改代码,它将起作用:
def _create_task():
asyncio.ensure_future(abar("abar"), loop=worker_loop)
@app.route("/")
def notify():
worker_loop.call_soon_threadsafe(_create_task)
return "OK"
Run Code Online (Sandbox Code Playgroud)
但是,这又是一个坏主意。这不仅很不方便,而且我想也没有多大意义:如果您要使用线程来启动异步,为什么不只在Flask中使用线程而不是异步呢?您将拥有所需的Flask和并行化。
如果我仍然不能说服您,请至少看看Flask-aiohttp项目。它与Flask api十分接近,我想您尝试做的还要更好。
| 归档时间: |
|
| 查看次数: |
22338 次 |
| 最近记录: |