如何asyncio.gather成块的任务+使用具有TCP连接限制的信号量?

Yan*_*ler 3 python-asyncio aiohttp

我有一个大型 (1M) 数据库结果集,我想为每一行调用 REST API。

API 可以接受批量请求,但我不确定如何对rows生成器进行切片,以便每个任务处理一个行列表,例如 10 行。我宁愿不预先读取所有行并坚持使用生成器。

在一个 http 请求中发送my_function一个列表是很容易的,但是呢asyncio.gather?也许其中之一itertools可以提供帮助。

请参阅下面的通用伪代码进行说明:

async def main(rows):
    async with aiohttp.ClientSession() as session:
        tasks = [my_function(row, session) for row in rows]
        return await asyncio.gather(*tasks)

rows = <generator of database rows>
results = asyncio.run(main(rows))
Run Code Online (Sandbox Code Playgroud)

注意:它们results很小,基本上是每行的确认值。

顺便说一句,

  • asyncio.gather()可以(有效)处理的任务数量是否有限制?
  • 当前gather()将所有请求/任务加载到内存中,消耗 50GB(!)。如何即时读取和传递行和任务以减少内存使用?这是asyncio.BoundedSemaphore()用来做什么的吗?
  • TCP 连接限制为 500,因为 REST Web 服务器可以接受这个数量。如果信号量发挥作用,该值应该是多少,即设置信号量> TCP连接限制是否有意义?

aiohttp很棒asyncio但很难理解 - 我同意这篇文章

asyncio 一直在变化,所以要警惕旧的 Stack Overflow 答案。其中许多没有跟上当前的最佳实践

编辑

我刚刚尝试使用 aasyncio.BoundedSemaphore(100)并且内存使用量大致相同(45GB) - 不确定它比连接限制有任何好处

use*_*342 10

基于信号量的解决方案对大量任务的内存使用没有帮助,因为您仍然需要提前创建所有协程和任务。所有协程都将开始执行,只是其中大多数会立即挂起,直到信号量让它们继续执行。

相反,您可以创建固定数量的工作线程并通过队列向它们提供数据库行:

async def worker(queue, session, results):
    while True:
        row = await queue.get()
        results.append(await my_function(row, session))
        # Mark the item as processed, allowing queue.join() to keep
        # track of remaining work and know when everything is done.
        queue.task_done()

async def main(rows):
    N_WORKERS = 50
    queue = asyncio.Queue(N_WORKERS)
    results = []
    async with aiohttp.ClientSession() as session:
        # create 50 workers and feed them tasks
        workers = [asyncio.create_task(worker(queue, session, results))
                   for _ in range(N_WORKERS)]
        # Feed the database rows to the workers. The fixed-capacity of the
        # queue ensures that we never hold all rows in the memory at the
        # same time. (When the queue reaches full capacity, this will block
        # until a worker dequeues an item.)
        async for row in rows:
            await queue.put(row)
        # Wait for all enqueued items to be processed.
        await queue.join()
    # The workers are now idly waiting for the next queue item and we
    # no longer need them.
    for worker in workers:
        worker.cancel()
    return results
Run Code Online (Sandbox Code Playgroud)

请注意,它rows应该是一个异步生成器。如果是普通的生成器,它可能会阻塞事件循环并成为瓶颈。如果您的数据库不支持异步接口,请参阅此答案,了解通过在专用线程中运行阻塞生成器将其转换为异步的方法。

要批量处理项目,您可以构建一个中间列表并分派它。aiostream或者您可以使用运算符附带的优秀库来chunks执行此操作:

async with aiostream.stream.chunks(rows, 10).stream() as chunks:
    async for batch in chunks:
         await queue.put(batch)  # enqueue a batch of 10 rows
Run Code Online (Sandbox Code Playgroud)