asyncio.run 的文档指出:
该函数总是创建一个新的事件循环并在最后关闭它。它应该用作 asyncio 程序的主要入口点,并且最好只调用一次。
但它没有说为什么。我有一个非异步程序需要调用异步内容。我可以asyncio.run在每次到达异步部分时使用,还是这是不安全/错误的?
就我而言,我有几个async想要的协程gather并并行运行以完成。当它们全部完成后,我想继续我的同步代码。
async my_task(url):
# request some urls or whatever
integration_tasks = [my_task(url1), my_task(url2)]
async def gather_tasks(*integration_tasks):
return await asyncio.gather(*integration_tasks)
def complete_integrations(*integration_tasks):
return asyncio.run(gather_tasks(*integration_tasks))
print(complete_integrations(*integration_tasks))
Run Code Online (Sandbox Code Playgroud)
asyncio.run()多次运行协程吗?这实际上是一个有趣且非常重要的问题。
正如asyncio (python3.9) 的文档所述:
该函数总是创建一个新的事件循环并在最后关闭它。它应该用作 asyncio 程序的主要入口点,并且最好只调用一次。
它并不禁止多次调用它。此外,还有一种从同步代码调用协程的旧方法,即:
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
Run Code Online (Sandbox Code Playgroud)
现已弃用,因为get_event_loop()方法,文档说:
还可以考虑使用 asyncio.run() 函数,而不是使用较低级别的函数来手动创建和关闭事件循环。
自版本 3.10 起已弃用:如果没有正在运行的事件循环,则会发出弃用警告。在未来的 Python 版本中,该函数将是 get_running_loop() 的别名。
因此,在未来的版本中,如果已经运行的事件循环不存在,它将不会产生新的事件循环!文档建议使用asyncio.run()if 你想在没有新循环的情况下自动生成新循环。
做出这样的决定是有充分理由的。即使您有一个事件循环并且您将成功地使用它来执行协程,您还必须记住做一些事情:
正确完成事件循环到底需要做什么您可以阅读此源代码。
手动管理事件循环(如果没有正在运行的事件循环)是一个微妙的过程,最好不要这样做,除非有人知道他在做什么。
所以是的,我认为从同步代码运行异步函数的正确方法是调用asyncio.run(). 但它仅适用于完全同步的应用程序。如果已经有正在运行的事件循环,它可能会失败(未测试)。在这种情况下,只需await使用它或使用get_runing_loop().run_untilcomplete(coro).
对于此类同步应用程序,使用asyncio.run()它是安全的方法,实际上是唯一安全的方法,并且可以多次调用它。
文档说“您应该只调用它一次”的原因是,整个异步应用程序通常只有一个入口点。它简化了事情并实际上提高了性能,因为为事件循环设置精简也需要一些时间。但是,如果您的应用程序中没有可用的单个循环,您应该使用多个调用来asyncio.run()多次运行协程。
除了讨论对 的多次调用之外asyncio.run(),我还想解决另一个问题。@jwal 在评论中说:
asyncio 不是并行处理。文档中是这么说的。[...] 如果您想要并行,请在具有单独 CPU 核心的计算机上的单独进程中运行,而不是单独的线程,而不是单独的事件循环。
暗示 asyncio 不适合并行处理,这可能会被误解并误导性地得出这样的结论:它不会带来性能增益,但这并不总是正确的。而且它通常是假的!
因此,任何时候您可以将作业委托给外部进程(不仅是 python 进程,还可以是数据库工作进程、http 调用,最好是任何 TCP 套接字调用),您都可以使用 asyncio 来提高性能。在绝大多数情况下,当您使用公开异步接口的库时,该库的作者会努力最终等待网络/套接字/进程调用的结果。当来自此类套接字的响应尚未准备好时,事件循环可以完全自由地执行任何其他任务。如果循环有多个这样的任务,它将获得性能。
此类情况的典型示例是调用 HTTP 端点。在某些时候,将会有一个网络调用,因此 python 线程在等待数据出现在 TCP 套接字缓冲区上时可以自由地执行其他工作。我有一个例子!
该示例使用 httpx 库来比较多次调用OpenWeatherMap API的性能。有两个功能:
get_weather_async()get_weather_sync()第一个对 http API 执行 8 个请求,但安排这些请求在事件循环上使用asyncio.gather().
第二个按顺序执行8个同步请求。
为了调用异步函数,我实际上使用的是asyncio.run()方法。而且,我正在使用timeit模块执行此类调用asyncio.run()4 次。所以在单个Python应用程序中,asyncio.run()被调用了4次,只是为了挑战我之前的考虑。
from time import time
import httpx
import asyncio
import timeit
from random import uniform
class AsyncWeatherApi:
def __init__(
self, base_url: str = "https://api.openweathermap.org/data/2.5"
) -> None:
self.client: httpx.AsyncClient = httpx.AsyncClient(base_url=base_url)
async def weather(self, lat: float, lon: float, app_id: str) -> dict:
response = await self.client.get(
"/weather",
params={
"lat": lat,
"lon": lon,
"appid": app_id,
"units": "metric",
},
)
response.raise_for_status()
return response.json()
class SyncWeatherApi:
def __init__(
self, base_url: str = "https://api.openweathermap.org/data/2.5"
) -> None:
self.client: httpx.Client = httpx.Client(base_url=base_url)
def weather(self, lat: float, lon: float, app_id: str) -> dict:
response = self.client.get(
"/weather",
params={
"lat": lat,
"lon": lon,
"appid": app_id,
"units": "metric",
},
)
response.raise_for_status()
return response.json()
def get_random_locations() -> list[tuple[float, float]]:
"""generate 8 random locations in +/-europe"""
return [(uniform(45.6, 52.3), uniform(-2.3, 29.4)) for _ in range(8)]
async def get_weather_async(locations: list[tuple[float, float]]):
api = AsyncWeatherApi()
return await asyncio.gather(
*[api.weather(lat, lon, api_key) for lat, lon in locations]
)
def get_weather_sync(locations: list[tuple[float, float]]):
api = SyncWeatherApi()
return [api.weather(lat, lon, api_key) for lat, lon in locations]
api_key = "secret"
def time_async_job(repeat: int = 1):
locations = get_random_locations()
def run():
return asyncio.run(get_weather_async(locations))
duration = timeit.Timer(run).timeit(repeat)
print(
f"[ASYNC] In {duration}s: done {len(locations)} API calls, all"
f" repeated {repeat} times"
)
def time_sync_job(repeat: int = 1):
locations = get_random_locations()
def run():
return get_weather_sync(locations)
duration = timeit.Timer(run).timeit(repeat)
print(
f"[SYNC] In {duration}s: done {len(locations)} API calls, all repeated"
f" {repeat} times"
)
if __name__ == "__main__":
time_sync_job(4)
time_async_job(4)
Run Code Online (Sandbox Code Playgroud)
最后打印了性能比较。它说:
[SYNC] In 5.5580058859995916s: done 8 API calls, all repeated 4 times
[ASYNC] In 2.865574334995472s: done 8 API calls, all repeated 4 times
Run Code Online (Sandbox Code Playgroud)
这 4 次重复只是为了表明您可以安全地运行asyncio.run()多次。它实际上对测量异步 http 调用的性能产生了破坏性影响,因为所有 32 个请求实际上是在 8 个异步任务的 4 个同步批次中运行的。只是为了比较一批 32 个请求的性能:
[SYNC] In 4.373898585996358s: done 32 API calls, all repeated 1 times
[ASYNC] In 1.5169846520002466s: done 32 API calls, all repeated 1 times
Run Code Online (Sandbox Code Playgroud)
所以,是的,如果只使用适当的异步库,它可以并且通常会带来性能提升(如果库公开异步 API,它通常会故意这样做,因为知道某处会有网络调用)。
| 归档时间: |
|
| 查看次数: |
4589 次 |
| 最近记录: |