对没有 sleep() 的代码使用 async 是否有意义?

Mor*_*sen 4 python asynchronous python-3.x python-asyncio

我读过很多解释asyncPython 的不同文章。但他们都给出了一个例子asyncio.sleep(x),就像这样:

import asyncio

async def test1 ():
    await asyncio.sleep(1)
    print(1)

async def test2 ():
    print(2)

async def main ():
    await asyncio.gather(test1(), test2())

asyncio.run(main()) #prints 2, then 1
Run Code Online (Sandbox Code Playgroud)

在这种情况下,一切对我来说都很清楚:函数 test1 中的await 表示在执行 asyncio.sleep 期间我们可以做其他事情,例如执行函数 test2。

我不明白的是,如果我不在代码中使用睡眠,异步怎么会有用呢?在这种情况下如何同时运行函数?例如,如何同时运行下面示例中的函数 test1 和 test2?

import asyncio
import time

async def calculate (a):
    return a**a

async def test1 ():
    x = await calculate(1111111)
    print('done!')

async def test2 ():
    for i in range(100):
        print('.', end='')

async def main ():
    await asyncio.gather(test1(), test2())

asyncio.run(main()) #prints 'done!' before the dots
Run Code Online (Sandbox Code Playgroud)

Mar*_*ers 5

当您的代码需要等待时,Asyncio 非常有用。每个await语句都是其他代码运行的机会,这些代码不再需要等待。

文档显示某些内容正在等待的最简单、最简单的方法是函数asyncio.sleep(),因为它简单且易于理解。此外,它不需要外部库或网络服务即可工作。

有很多事情可能需要等待:

  • 任何网络交互都涉及大量的等待;网络通信肯定比 Python 代码慢。

  • 文件系统 I/O 也必须等待很长时间,因为即使使用 SSD 和缓存,I/O 也比 CPU 周期慢。(警告:目前还没有太多选项可以从 asyncio 中利用这一点,因为缺乏操作系统支持。asyncio文件 I/O 的库往往会转而使用线程)。

  • 进程间通信(例如,您在其他地方使用该subprocess模块的地方),因为操作系统(而不是您的代码)决定其他程序何时运行并产生输出。

  • 不同任务之间的协调,其中一件事必须等到另一件事完成后才能完成。通常那些其他事情还没有完成,因为他们正在等待某些事情。

  • 与人类互动也需要等待,因为人比计算机慢得多

任何使用更高级别的构造(例如数据库)都将涉及上述许多内容,因此请务必检查是否有一个asyncio兼容的库可以让您执行您想要的操作。

在你的第二个例子中,几乎没有什么等待。代码必须等待一点的唯一一点是当您使用 时print(),因为这涉及 I/O(写入终端),但print()不是为使用而构建的asyncio,因此您的代码在发生这种情况时无法执行任何其他操作。print()是一个阻塞函数,这意味着 asyncio 在阻塞时不能做任何其他事情。为了让 asyncio 发挥作用,您需要避免阻塞。

我在下面提供了一个更复杂的示例,该脚本下载命令行上给出的文件中列出的多个 URL。它利用该aiohttp来处理 HTTP 请求、aiofiles文件 I/O 以及tqdm显示进度条。可以配置它并行处理的 URL 数量。免责声明:我是该项目的贡献者aiohttp,但不是一个非常活跃的人。

#!/usr/bin/env python
import asyncio
import traceback
from contextlib import AsyncExitStack, contextmanager

import aiofiles
import aiohttp
import tqdm

@contextmanager
def put_exceptions(log, *exceptions):
    exceptions = exceptions or (Exception,)
    try:
        yield
    except exceptions:
        log(f"Exception: {traceback.format_exc()}")

def filename_from_response(response):
    disp = response.content_disposition
    if disp and disp.filename:
        return filename
    return response.url.name

async def worker(queue, session, main_progress):
    async with AsyncExitStack() as worker_context:
        worker_context.enter_context(put_exceptions(print))
        while True:
            url = await queue.get()
            async with AsyncExitStack() as task_context:
                task_context.callback(queue.task_done)
                task_context.callback(main_progress.update)
                async with session.get(url) as response:
                    filename = filename_from_response(response)
                    size = int(response.headers.get("content-length", 0)) or None
                    pbar = tqdm.tqdm(
                        desc=filename,
                        leave=False,
                        total=size,
                        unit="iB",
                        unit_scale=True,
                        unit_divisor=1024,
                        colour="cyan",
                    )
                    task_context.callback(pbar.close)
                    async with aiofiles.open(filename, "wb") as outf:
                        while chunk := await response.content.read(8192):
                            await outf.write(chunk)
                            pbar.update(len(chunk))
                        pbar.refresh()

 async def main(input_file, worker_count):
     with tqdm.tqdm(
         total=0, unit="f", colour="green", miniters=1, smoothing=0.1
     ) as pbar:
         queue = asyncio.Queue()
         async with aiohttp.ClientSession(raise_for_status=True) as session:
             workers = [
                 asyncio.create_task(worker(queue, session, pbar), name=f"worker{i + 1}")
                 for i in range(worker_count)
             ]
             async with aiofiles.open(input_file) as f:
                 async for url in f:
                     queue.put_nowait(url)
                     pbar.total += 1
                     pbar.refresh()
             await queue.join()
             for worker_task in workers:
                 worker_task.cancel()
             await asyncio.wait(workers)

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(
        description="Download URLs to disk, given an input file listing the URLS"
    )
    parser.add_argument("filename")
    parser.add_argument(
        "-w", dest="worker_count", help="worker count", default=5, type=int
    )
    args = parser.parse_args()
    try:
        asyncio.run(main(args.filename, args.worker_count))
    except KeyboardInterrupt:
        pass
Run Code Online (Sandbox Code Playgroud)

使用时,它看起来像这样(从这组示例卫星图像中获取的示例 URL ):

终端屏幕显示脚本输出以及多个 URL 的进度条

该脚本创建了许多额外的任务,每个worker()任务都执行协程,该协程使用共享会话对象对 URL 发出 HTTP 请求,之后循环将响应数据分块写入磁盘;这使得监控进展成为可能。现在有很多等待点:

  • 在循环中逐行读取输入文件async for。这意味着如果脚本必须等待操作系统从磁盘生成数据,则脚本可以执行其他操作。
  • 每个工作人员首先必须通过队列等待 URL 可用。
  • 一旦获得 URL,async with session.get()调用就会导致网络操作必须等待远程服务器开始响应。
  • 对于返回的每个响应,我们必须等待更多数据从远程服务器到达。
  • 对于到达的每个数据块,写入磁盘将需要更多的等待。
  • 从输入文件中读取所有 URL 后,主函数必须等待工作线程清空队列,然后完成下载它们仍在处理的 URL。

tqdm 进度条至少应该可以更容易地看到事情仍在进行,并且下载和磁盘写入是并行发生的。