asyncio.gather(*tasks) 未能仅等待所有任务的子集

edo*_*ron 5 python asynchronous async-await python-asyncio

要解决的问题(简化)

假设我有 26 个任务要并行运行。为了最大限度地减少服务器上的负载,我决定一次运行 10 个任务:首先并行运行 10 个任务,然后是接下来的 10 个任务,最后是剩余的 6 个任务。

我编写了一个简单的脚本来实现此行为:

import asyncio
from string import ascii_uppercase
from typing import List

TASK_NAMES = ascii_uppercase  # 26 fake tasks in total


class BatchWorker:
    """Run a list of tasks in batch."""

    BATCH_SIZE = 10

    def __init__(self, tasks: List[asyncio.Task]):
        self._tasks = list(tasks)

    @property
    def batch_of_tasks(self):
        """Yield all tasks by chunks of `BATCH_SIZE`"""
        start = 0
        while 'there are items remaining in the list':
            end = start + self.BATCH_SIZE
            chunk = self._tasks[start:end]
            if not chunk:
                break
            yield chunk
            start = end

    async def run(self):
        print(f'Running {self.BATCH_SIZE} tasks at a time')
        for batch in self.batch_of_tasks:
            print(f'\nWaiting for {len(batch)} tasks to complete...')
            await asyncio.gather(*batch)
            print('\nSleeping...\n---')
            await asyncio.sleep(1)


async def task(name: str):
    print(f"Task '{name}' is running...")
    await asyncio.sleep(3)  # Pretend to do something


async def main():
    tasks = [
      asyncio.create_task(task(name))
      for name in TASK_NAMES
    ]
    worker = BatchWorker(tasks)
    await worker.run()


if __name__ == '__main__':
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

我会期待什么

我预计日志如下:

Task A is running
[...]
Task J is running
Sleeping
---
Task K is running
[...]
Task T is running
Sleeping
---
[...]
Run Code Online (Sandbox Code Playgroud)

...你明白了。

我实际得到的

然而,在第一次迭代中,工作人员等待所有 26 个任务完成,尽管我要求只收集其中 10 个任务。查看日志:

Running 10 tasks at a time

Waiting for 10 tasks to complete...
Task 'A' is running...
Task 'B' is running...
Task 'C' is running...
Task 'D' is running...
Task 'E' is running...
Task 'F' is running...
Task 'G' is running...
Task 'H' is running...
Task 'I' is running...
Task 'J' is running...
Task 'K' is running...
Task 'L' is running...
Task 'M' is running...
Task 'N' is running...
Task 'O' is running...
Task 'P' is running...
Task 'Q' is running...
Task 'R' is running...
Task 'S' is running...
Task 'T' is running...
Task 'U' is running...
Task 'V' is running...
Task 'W' is running...
Task 'X' is running...
Task 'Y' is running...
Task 'Z' is running...

Sleeping...
---

Waiting for 10 tasks to complete...

Sleeping...
---

Waiting for 6 tasks to complete...

Sleeping...
---
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,总共有 3 个批次(如预期),但只有第一个批次做了一些事情。剩下的2个没有什么关系。

我的问题

  1. 鉴于官方文档规定.gather()将仅同时运行作为参数提供的可等待项,为什么我的脚本运行所有任务而不是其中的块?

  2. 我还应该使用什么来让它按照我的意愿工作?

use*_*342 5

gather并没有真正“运行”等待对象,它只是在事件循环执行其操作时休眠,并在收到的等待对象完成后唤醒。你的代码的作用是:

  1. 用于asyncio.create_task()在后台生成一堆等待对象。
  2. 用于asyncio.gather()分批等待,直到其中一些完成。

事实上,gather()#2 中接收到 #1 中创建的任务子集,这一事实不会阻止 #1 中创建的其余任务正常运行。

要解决此问题,您必须将呼叫推迟create_task()到最后一点。事实上,由于gather()调用ensure_future()它的参数(并且ensure_future使用协程对象调用最终会调用),因此您根本create_task不需要调用。create_task()如果您删除对 main 的调用create_task(),而只是将协程对象传递给BatchWorker(以及随后传递给gather),则任务将按批计划和等待,正如您所希望的那样:

async def main():
    tasks = [task(name) for name in TASK_NAMES]
    worker = BatchWorker(tasks)
    await worker.run()
Run Code Online (Sandbox Code Playgroud)