与asyncio有限的并发性

Shr*_*han 18 python concurrency asynchronous python-3.x python-asyncio

假设我们有一堆下载链接,每个链接可能需要不同的下载时间.我只允许使用最多3个连接下载.现在,我想确保使用asyncio有效地执行此操作.

这就是我想要实现的目标:在任何时候,尽量确保我至少运行3次下载.

Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----
Run Code Online (Sandbox Code Playgroud)

数字代表下载链接,而连字符代表等待下载.

这是我正在使用的代码

from random import randint
import asyncio

count = 0


async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()


async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()
Run Code Online (Sandbox Code Playgroud)

输出符合预期:

downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8
Run Code Online (Sandbox Code Playgroud)

但这是我的问题:

  1. 目前,我只是等待9秒钟来保持主要功能运行,直到下载完成.在退出main函数之前是否有一种等待上次下载完成的有效方法?(我知道有asyncio.wait,但是我需要存储它的所有任务引用才能工作)

  2. 什么是做这种任务的好图书馆?我知道javascript有很多异步库,但是Python呢?

编辑:2.什么是一个好的库来处理常见的异步模式?(像https://www.npmjs.com/package/async这样的东西)

And*_*rei 56

我用了 Mikhails 的回答,最后得到了这个小宝石

async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task
    return await asyncio.gather(*(sem_task(task) for task in tasks))
Run Code Online (Sandbox Code Playgroud)

您将运行而不是正常收集

await gather_with_concurrency(100, *my_coroutines)
Run Code Online (Sandbox Code Playgroud)

  • 看到函数中的函数,我立刻想到了装饰器。我做了一点尝试,你可以使用装饰器来实现它,无论是使用固定信号量值还是动态信号量值;然而,这里的解决方案提供了更大的灵活性。 (8认同)
  • 这是一个很好的实用函数,+1。 (6认同)
  • “gather_with_concurrency”的“tasks”参数有点误导,它意味着您可以将该函数与使用“asyncio.create_task”创建的多个任务一起使用。然而在这种情况下它不起作用,因为“create_task”实际上是在事件循环中立即执行协程。由于“gather_with_concurrency”需要协程,因此参数应该命名为“coros”。 (3认同)
  • 这是一个非常精彩、清晰、简短的例子! (2认同)
  • @JT 这种方法无法处理设计任务。整个想法是,您调用的协程不知道这种情况正在发生,并且等待是由“gather_with_concurrency”处理的。这对于协程来说是可能的,根据定义,协程在将它们提交到事件循环(即从它们中创建任务)之前不会运行。如果你已经有任务了,那就意味着协程已经开始运行了,你的`async with`就没用了。您当然可以将“async with”添加到任务本身,但是您不需要从“gather_with_concurrency”开始。 (2认同)

Mik*_*mov 36

如果我没弄错的话你正在寻找asyncio.Semaphore.用法示例:

import asyncio
from random import randint


async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))


sem = asyncio.Semaphore(3)


async def safe_download(i):
    async with sem:  # semaphore limits num of simultaneous downloads
        return await download(i)


async def main():
    tasks = [
        asyncio.ensure_future(safe_download(i))  # creating task starts coroutine
        for i
        in range(9)
    ]
    await asyncio.gather(*tasks)  # await moment all downloads done


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()
Run Code Online (Sandbox Code Playgroud)

输出:

downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6
Run Code Online (Sandbox Code Playgroud)

aiohttp可以在此处找到异步下载的示例.

  • @Shridharshan从我的经验asyncio本身包含你通常需要的一切.查看[synchronization primitives](https://docs.python.org/3/library/asyncio-sync.html#synchronization-primitives)和模块的[functions](https://docs.python.org/ 3/library/asyncio-task.html #task-functions). (3认同)
  • @MikhailGerasimov 调用 `asyncio.ensure_future()` 是多余的,因为 `async.gather()` 无论如何都会在内部调用它([source](https://github.com/python/cpython/blob/master/Lib/asyncio/tasks .py#L762))。然而,调用变量“tasks”将是“错误的”,因为这些还不是任务。 (3认同)
  • @politicalscientist 这意味着在任何给定时间点不能同时激活超过 3 个请求。 (3认同)
  • 这将停止扩展 1k-10k 任务。这会将所有任务添加到事件循环的开头,因此事件循环将花费大部分时间在循环调度程序中尝试找到下一个要运行的任务,而不是实际运行任务!您想要做的是限制事件循环中的任务数量,如下答案所示:/sf/answers/3393921541/ (3认同)
  • 是否有一个好的 Python 异步库来处理常见的异步编程模式?就像著名的 JavaScript 异步包一样。 (2认同)

use*_*342 15

您基本上需要一个固定大小的下载任务.asyncio没有开箱即用的这种功能,但很容易创建一个:只需保留一组任务,不要让它超过限制.虽然问题表明你不愿意沿着这条路走下去,但代码更加优雅:

async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)
Run Code Online (Sandbox Code Playgroud)

另一种方法是创建一个固定数量的协同程序来进行下载,就像固定大小的线程池一样,并使用它来提供它们的工作asyncio.Queue.这消除了手动限制下载次数的需要,这将自动受到调用协同程序数量的限制download():

# download() defined as above

async def download_from(q):
    while True:
        code = await q.get()
        if code is None:
            # pass on the word that we're done, and exit
            await q.put(None)
            break
        await download(code)

async def main(loop):
    q = asyncio.Queue()
    dltasks = [loop.create_task(download_from(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    # Inform the consumers there is no more work.
    await q.put(None)
    await asyncio.wait(dltasks)
Run Code Online (Sandbox Code Playgroud)

至于你的另一个问题,显而易见的选择是aiohttp.

  • @OrangeDog这实际上是故意的,因为OP的代码是使用手动`while`循环.我们的想法是使他们现有的代码(保留非传统习语)适应所需的语义. (2认同)
  • @Krissh由于您不提供代码或确切的错误消息,因此很难说出您所指的内容,但请放心,`asyncio.Semaphore` 并未被弃用。已弃用并将被删除的是其构造函数的“loop”参数,您可以省略该参数,一切都会正常工作。(这并非特定于信号量,“loop”参数正在被[全面]删除(/sf/answers/4222070331/)。) (2认同)

ben*_*min 12

如果您有一个生成任务的生成器,则可能有更多的任务无法同时容纳在内存中。

经典的asyncio.Semaphore上下文管理器模式将所有任务同时放入内存中。

我不喜欢这个asyncio.Queue图案。您可以防止它将所有任务预加载到内存中(通过设置maxsize=1),但它仍然需要样板来定义、启动和关闭工作协程(从队列中消耗),并且您必须确保工作人员不会失败如果任务抛出异常。感觉不太像Python,就像实现你自己的multiprocessing.pool.

相反,这里有一个替代方案:

sem = asyncio.Semaphore(n := 5) # specify maximum concurrency

async def task_wrapper(args):
    try:
        await my_task(*args)
    finally:
        sem.release()

for args in my_generator: # may yield too many to list
    await sem.acquire() 
    asyncio.create_task(task_wrapper(args))

# wait for all tasks to complete
for i in range(n):
    await sem.acquire()
Run Code Online (Sandbox Code Playgroud)

当有足够的活动任务时,这会暂停生成器,并让事件循环清理已完成的任务。请注意,对于较旧的 python 版本,请替换create_taskensure_future.


小智 10

asyncio-pool 库正是您所需要的。

https://pypi.org/project/asyncio-pool/


LIST_OF_URLS = ("http://www.google.com", "......")

pool = AioPool(size=3)
await pool.map(your_download_coroutine, LIST_OF_URLS)
Run Code Online (Sandbox Code Playgroud)


小智 9

小更新:不再需要创建循环。我调整了下面的代码。只是稍微清理一下。

# download(code) is the same

async def main():
    no_concurrent = 3
    dltasks = set()
    for i in range(9):
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(asyncio.create_task(download(i)))
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

if __name__ == '__main__':
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)


yan*_*iao 9

使用信号量,您还可以创建装饰器来包装函数

import asyncio
from functools import wraps
def request_concurrency_limit_decorator(limit=3):
    # Bind the default event loop 
    sem = asyncio.Semaphore(limit)

    def executor(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with sem:
                return await func(*args, **kwargs)

        return wrapper

    return executor
Run Code Online (Sandbox Code Playgroud)

然后,将装饰器添加到原始下载功能中。

@request_concurrency_limit_decorator(limit=...)
async def download(...):
    ...
Run Code Online (Sandbox Code Playgroud)

现在你可以像以前一样调用下载函数,但是用Semaphore来限制并发。

await download(...)
Run Code Online (Sandbox Code Playgroud)

需要注意的是,装饰器函数执行时,创建的Semaphore会绑定到默认的事件循环,因此无法调用asyncio.run创建新的循环。相反,调用asyncio.get_event_loop().run...以使用默认事件循环。

asyncio.Semaphore RuntimeError:任务将 Future 附加到不同的循环