Mor*_*sen 4 python asynchronous python-3.x python-asyncio
我读过很多解释asyncPython 的不同文章。但他们都给出了一个例子asyncio.sleep(x),就像这样:
import asyncio
async def test1 ():
await asyncio.sleep(1)
print(1)
async def test2 ():
print(2)
async def main ():
await asyncio.gather(test1(), test2())
asyncio.run(main()) #prints 2, then 1
Run Code Online (Sandbox Code Playgroud)
在这种情况下,一切对我来说都很清楚:函数 test1 中的await 表示在执行 asyncio.sleep 期间我们可以做其他事情,例如执行函数 test2。
我不明白的是,如果我不在代码中使用睡眠,异步怎么会有用呢?在这种情况下如何同时运行函数?例如,如何同时运行下面示例中的函数 test1 和 test2?
import asyncio
import time
async def calculate (a):
return a**a
async def test1 ():
x = await calculate(1111111)
print('done!')
async def test2 ():
for i in range(100):
print('.', end='')
async def main ():
await asyncio.gather(test1(), test2())
asyncio.run(main()) #prints 'done!' before the dots
Run Code Online (Sandbox Code Playgroud)
当您的代码需要等待时,Asyncio 非常有用。每个await语句都是其他代码运行的机会,这些代码不再需要等待。
文档显示某些内容正在等待的最简单、最简单的方法是函数asyncio.sleep(),因为它简单且易于理解。此外,它不需要外部库或网络服务即可工作。
有很多事情可能需要等待:
任何网络交互都涉及大量的等待;网络通信肯定比 Python 代码慢。
文件系统 I/O 也必须等待很长时间,因为即使使用 SSD 和缓存,I/O 也比 CPU 周期慢。(警告:目前还没有太多选项可以从 asyncio 中利用这一点,因为缺乏操作系统支持。asyncio文件 I/O 的库往往会转而使用线程)。
进程间通信(例如,您在其他地方使用该subprocess模块的地方),因为操作系统(而不是您的代码)决定其他程序何时运行并产生输出。
不同任务之间的协调,其中一件事必须等到另一件事完成后才能完成。通常那些其他事情还没有完成,因为他们正在等待某些事情。
与人类互动也需要等待,因为人比计算机慢得多。
任何使用更高级别的构造(例如数据库)都将涉及上述许多内容,因此请务必检查是否有一个asyncio兼容的库可以让您执行您想要的操作。
在你的第二个例子中,几乎没有什么等待。代码必须等待一点的唯一一点是当您使用 时print(),因为这涉及 I/O(写入终端),但print()不是为使用而构建的asyncio,因此您的代码在发生这种情况时无法执行任何其他操作。print()是一个阻塞函数,这意味着 asyncio 在阻塞时不能做任何其他事情。为了让 asyncio 发挥作用,您需要避免阻塞。
我在下面提供了一个更复杂的示例,该脚本下载命令行上给出的文件中列出的多个 URL。它利用该aiohttp库来处理 HTTP 请求、aiofiles文件 I/O 以及tqdm显示进度条。可以配置它并行处理的 URL 数量。免责声明:我是该项目的贡献者aiohttp,但不是一个非常活跃的人。
#!/usr/bin/env python
import asyncio
import traceback
from contextlib import AsyncExitStack, contextmanager
import aiofiles
import aiohttp
import tqdm
@contextmanager
def put_exceptions(log, *exceptions):
exceptions = exceptions or (Exception,)
try:
yield
except exceptions:
log(f"Exception: {traceback.format_exc()}")
def filename_from_response(response):
disp = response.content_disposition
if disp and disp.filename:
return filename
return response.url.name
async def worker(queue, session, main_progress):
async with AsyncExitStack() as worker_context:
worker_context.enter_context(put_exceptions(print))
while True:
url = await queue.get()
async with AsyncExitStack() as task_context:
task_context.callback(queue.task_done)
task_context.callback(main_progress.update)
async with session.get(url) as response:
filename = filename_from_response(response)
size = int(response.headers.get("content-length", 0)) or None
pbar = tqdm.tqdm(
desc=filename,
leave=False,
total=size,
unit="iB",
unit_scale=True,
unit_divisor=1024,
colour="cyan",
)
task_context.callback(pbar.close)
async with aiofiles.open(filename, "wb") as outf:
while chunk := await response.content.read(8192):
await outf.write(chunk)
pbar.update(len(chunk))
pbar.refresh()
async def main(input_file, worker_count):
with tqdm.tqdm(
total=0, unit="f", colour="green", miniters=1, smoothing=0.1
) as pbar:
queue = asyncio.Queue()
async with aiohttp.ClientSession(raise_for_status=True) as session:
workers = [
asyncio.create_task(worker(queue, session, pbar), name=f"worker{i + 1}")
for i in range(worker_count)
]
async with aiofiles.open(input_file) as f:
async for url in f:
queue.put_nowait(url)
pbar.total += 1
pbar.refresh()
await queue.join()
for worker_task in workers:
worker_task.cancel()
await asyncio.wait(workers)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Download URLs to disk, given an input file listing the URLS"
)
parser.add_argument("filename")
parser.add_argument(
"-w", dest="worker_count", help="worker count", default=5, type=int
)
args = parser.parse_args()
try:
asyncio.run(main(args.filename, args.worker_count))
except KeyboardInterrupt:
pass
Run Code Online (Sandbox Code Playgroud)
使用时,它看起来像这样(从这组示例卫星图像中获取的示例 URL ):
该脚本创建了许多额外的任务,每个worker()任务都执行协程,该协程使用共享会话对象对 URL 发出 HTTP 请求,之后循环将响应数据分块写入磁盘;这使得监控进展成为可能。现在有很多等待点:
async for。这意味着如果脚本必须等待操作系统从磁盘生成数据,则脚本可以执行其他操作。async with session.get()调用就会导致网络操作必须等待远程服务器开始响应。tqdm 进度条至少应该可以更容易地看到事情仍在进行,并且下载和磁盘写入是并行发生的。