Lio*_*ead 1 python python-asyncio
我需要为 aiohttp 创建工作队列。
现在我使用 asyncio.gather,但它的工作方式错误:
这就是我想做的:
第一个可以用以下代码实现:
async def some_stuff(_):
pass
tasks = []
for i in data:
tasks.append(do_stuff(i))
asyncio.run(asyncio.gather(*tasks))
Run Code Online (Sandbox Code Playgroud)
我需要的例子
据我了解,您想要并行运行 5 个任务。当其中一项任务完成后,您希望立即开始一项新任务。为此,它asyncio.gather不起作用,因为它需要等待所有任务完成才能继续。
我建议遵循以下原则:
from collections import deque
import random
import asyncio
class RunSome:
def __init__(self, task_count=5):
self.task_count = task_count
self.running = set()
self.waiting = deque()
@property
def running_task_count(self):
return len(self.running)
def add_task(self, coro):
if len(self.running) >= self.task_count:
self.waiting.append(coro)
else:
self._start_task(coro)
def _start_task(self, coro):
self.running.add(coro)
asyncio.create_task(self._task(coro))
async def _task(self, coro):
try:
return await coro
finally:
self.running.remove(coro)
if self.waiting:
coro2 = self.waiting.popleft()
self._start_task(coro2)
async def main():
runner = RunSome()
async def rand_delay():
rnd = random.random() + 0.5
print("Task started", asyncio.current_task().get_name(),
runner.running_task_count)
await asyncio.sleep(rnd)
print("Task ended", asyncio.current_task().get_name(),
runner.running_task_count)
for _ in range(50):
runner.add_task(rand_delay())
# keep the program alive until all the tasks are done
while runner.running_task_count > 0:
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
输出:
Task started Task-2 5
Task started Task-3 5
Task started Task-4 5
Task started Task-5 5
Task started Task-6 5
Task ended Task-6 5
Task started Task-7 5
Task ended Task-4 5
Task ended Task-2 5
Task started Task-8 5
Task started Task-9 5
Task ended Task-5 5
Task started Task-10 5
Task ended Task-3 5
.....
Task started Task-51 5
Task ended Task-48 5
Task ended Task-47 4
Task ended Task-49 3
Task ended Task-51 2
Task ended Task-50 1
Run Code Online (Sandbox Code Playgroud)
协程是 Python 中的第一类对象。因此,它们可以放入列表和集合中。
所有任务创建均由 处理RunSome。您将其传递给要执行的协程。它知道当前有多少任务正在运行,并决定立即创建新任务或将协程添加到待处理任务队列中。当任务完成时,它会从队列中获取一个新的协程(如果有)。正在运行的任务数量永远不会超过传递给构造函数的阈值计数(默认为 5)。这些任务是传递的协程的包装器。
您必须弄清楚如何处理返回值(如果有)。这里的错误处理是基本的,但由于 try:finally: 块,它确实保持了正在运行的任务的正确数量。
| 归档时间: |
|
| 查看次数: |
3022 次 |
| 最近记录: |