使用asyncio.run,多次运行是否安全?

Sco*_*ord 9 python-asyncio

asyncio.run 的文档指出:

该函数总是创建一个新的事件循环并在最后关闭它。它应该用作 asyncio 程序的主要入口点,并且最好只调用一次。

但它没有说为什么。我有一个非异步程序需要调用异步内容。我可以asyncio.run在每次到达异步部分时使用,还是这是不安全/错误的?

就我而言,我有几个async想要的协程gather并并行运行以完成。当它们全部完成后,我想继续我的同步代码。

async my_task(url):
    # request some urls or whatever

integration_tasks = [my_task(url1), my_task(url2)]

async def gather_tasks(*integration_tasks):
    return await asyncio.gather(*integration_tasks)


def complete_integrations(*integration_tasks):
    return asyncio.run(gather_tasks(*integration_tasks))

print(complete_integrations(*integration_tasks))
Run Code Online (Sandbox Code Playgroud)

igo*_*993 8

我可以用来asyncio.run()多次运行协程吗?

这实际上是一个有趣且非常重要的问题。

正如asyncio (python3.9) 的文档所述:

该函数总是创建一个新的事件循环并在最后关闭它。它应该用作 asyncio 程序的主要入口点,并且最好调用一次。

它并不禁止多次调用它。此外,还有一种从同步代码调用协程的旧方法,即:

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
Run Code Online (Sandbox Code Playgroud)

现已弃用,因为get_event_loop()方法,文档说:

还可以考虑使用 asyncio.run() 函数,而不是使用较低级别的函数来手动创建和关闭事件循环。

自版本 3.10 起已弃用:如果没有正在运行的事件循环,则会发出弃用警告。在未来的 Python 版本中,该函数将是 get_running_loop() 的别名。

因此,在未来的版本中,如果已经运行的事件循环不存在,它将不会产生新的事件循环!文档建议使用asyncio.run()if 你想在没有新循环的情况下自动生成新循环。

做出这样的决定是有充分理由的。即使您有一个事件循环并且您将成功地使用它来执行协程,您还必须记住做一些事情:

  • 关闭事件循环
  • 消耗未消耗的生成器(在协程失败的情况下最重要)
  • ...可能更多,我什至不想在这里提及

正确完成事件循环到底需要做什么您可以阅读此源代码

手动管理事件循环(如果没有正在运行的事件循环)是一个微妙的过程,最好不要这样做,除非有人知道他在做什么

所以是的,我认为从同步代码运行异步函数的正确方法是调用asyncio.run(). 但它仅适用于完全同步的应用程序。如果已经有正在运行的事件循环,它可能会失败(未测试)。在这种情况下,只需await使用它或使用get_runing_loop().run_untilcomplete(coro).

对于此类同步应用程序,使用asyncio.run()它是安全的方法,实际上是唯一安全的方法,并且可以多次调用它。

文档说“您应该只调用它一次”的原因是,整个异步应用程序通常只有一个入口点。它简化了事情并实际上提高了性能,因为为事件循环设置精简也需要一些时间。但是,如果您的应用程序中没有可用的单个循环,您应该使用多个调用来asyncio.run()多次运行协程。

有性能提升吗?

除了讨论对 的多次调用之外asyncio.run(),我还想解决另一个问题。@jwal 在评论中说:

asyncio 不是并行处理。文档中是这么说的。[...] 如果您想要并行,请在具有单独 CPU 核心的计算机上的单独进程中运行,而不是单独的线程,而不是单独的事件循环。

暗示 asyncio 不适合并行处理,这可能会被误解并误导性地得出这样的结论:它不会带来性能增益,但这并不总是正确的。而且它通常是假的!

因此,任何时候您可以将作业委托给外部进程(不仅是 python 进程,还可以是数据库工作进程、http 调用,最好是任何 TCP 套接字调用),您都可以使用 asyncio 来提高性能。在绝大多数情况下,当您使用公开异步接口的库时,该库的作者会努力最终等待网络/套接字/进程调用的结果。当来自此类套接字的响应尚未准备好时,事件循环可以完全自由地执行任何其他任务。如果循环有多个这样的任务,它将获得性能

此类情况的典型示例是调用 HTTP 端点。在某些时候,将会有一个网络调用,因此 python 线程在等待数据出现在 TCP 套接字缓冲区上时可以自由地执行其他工作。我有一个例子!

该示例使用 httpx 库来比较多次调用OpenWeatherMap API的性能。有两个功能:

  • get_weather_async()
  • get_weather_sync()

第一个对 http API 执行 8 个请求,但安排这些请求事件循环上使用asyncio.gather().

第二个按顺序执行8个同步请求。

为了调用异步函数,我实际上使用的是asyncio.run()方法。而且,我正在使用timeit模块执行此类调用asyncio.run()4 次。所以在单个Python应用程序中,asyncio.run()被调用了4次,只是为了挑战我之前的考虑。

from time import time
import httpx
import asyncio
import timeit
from random import uniform


class AsyncWeatherApi:
    def __init__(
        self, base_url: str = "https://api.openweathermap.org/data/2.5"
    ) -> None:

        self.client: httpx.AsyncClient = httpx.AsyncClient(base_url=base_url)

    async def weather(self, lat: float, lon: float, app_id: str) -> dict:
        response = await self.client.get(
            "/weather",
            params={
                "lat": lat,
                "lon": lon,
                "appid": app_id,
                "units": "metric",
            },
        )

        response.raise_for_status()

        return response.json()


class SyncWeatherApi:
    def __init__(
        self, base_url: str = "https://api.openweathermap.org/data/2.5"
    ) -> None:

        self.client: httpx.Client = httpx.Client(base_url=base_url)

    def weather(self, lat: float, lon: float, app_id: str) -> dict:
        response = self.client.get(
            "/weather",
            params={
                "lat": lat,
                "lon": lon,
                "appid": app_id,
                "units": "metric",
            },
        )

        response.raise_for_status()

        return response.json()


def get_random_locations() -> list[tuple[float, float]]:
    """generate 8 random locations in +/-europe"""
    return [(uniform(45.6, 52.3), uniform(-2.3, 29.4)) for _ in range(8)]


async def get_weather_async(locations: list[tuple[float, float]]):
    api = AsyncWeatherApi()
    return await asyncio.gather(
        *[api.weather(lat, lon, api_key) for lat, lon in locations]
    )


def get_weather_sync(locations: list[tuple[float, float]]):
    api = SyncWeatherApi()
    return [api.weather(lat, lon, api_key) for lat, lon in locations]


api_key = "secret"


def time_async_job(repeat: int = 1):
    locations = get_random_locations()

    def run():
        return asyncio.run(get_weather_async(locations))

    duration = timeit.Timer(run).timeit(repeat)
    print(
        f"[ASYNC] In {duration}s: done {len(locations)} API calls, all"
        f" repeated {repeat} times"
    )


def time_sync_job(repeat: int = 1):
    locations = get_random_locations()

    def run():
        return get_weather_sync(locations)

    duration = timeit.Timer(run).timeit(repeat)
    print(
        f"[SYNC] In {duration}s: done {len(locations)} API calls, all repeated"
        f" {repeat} times"
    )


if __name__ == "__main__":
    time_sync_job(4)
    time_async_job(4)
Run Code Online (Sandbox Code Playgroud)

最后打印了性能比较。它说:

[SYNC] In 5.5580058859995916s: done 8 API calls, all repeated 4 times
[ASYNC] In 2.865574334995472s: done 8 API calls, all repeated 4 times
Run Code Online (Sandbox Code Playgroud)

这 4 次重复只是为了表明您可以安全地运行asyncio.run()多次。它实际上对测量异步 http 调用的性能产生了破坏性影响,因为所有 32 个请求实际上是在 8 个异步任务的 4 个同步批次中运行的。只是为了比较一批 32 个请求的性能:

[SYNC] In 4.373898585996358s: done 32 API calls, all repeated 1 times
[ASYNC] In 1.5169846520002466s: done 32 API calls, all repeated 1 times
Run Code Online (Sandbox Code Playgroud)

所以,是的,如果只使用适当的异步库,它可以并且通常会带来性能提升(如果库公开异步 API,它通常会故意这样做,因为知道某处会有网络调用)。