asyncio.gather 的“懒惰”版本?

vau*_*tah 2 python python-asyncio

我正在使用 Python 的 asyncio 模块和async/await并发处理块中的字符序列并将结果收集在列表中。为此,我使用了块功能 ( split) 和块处理功能 ( process_chunk)。它们都来自第三方库,我不想更改它们。

分块很慢,并且事先不知道块的数量,这就是为什么我不想一次消耗整个块生成器。理想情况下,代码应该与process_chunk的信号量同步推进生成器,即每次函数返回时。

我的代码

import asyncio

def split(sequence):
    for x in sequence:
        print('Getting the next chunk:', x)
        yield x
    print('Finished chunking')

async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
    async with semaphore:
        print('Processing chunk:', chunk)
        await asyncio.sleep(3)
        return 'OK'

async def process_in_chunks(sequence):
    gen = split(sequence)
    coro = [process_chunk(chunk) for chunk in gen]
    results = await asyncio.gather(*coro)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(process_in_chunks('ABC'))
Run Code Online (Sandbox Code Playgroud)

作品和版画种类

import asyncio

def split(sequence):
    for x in sequence:
        print('Getting the next chunk:', x)
        yield x
    print('Finished chunking')

async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
    async with semaphore:
        print('Processing chunk:', chunk)
        await asyncio.sleep(3)
        return 'OK'

async def process_in_chunks(sequence):
    gen = split(sequence)
    coro = [process_chunk(chunk) for chunk in gen]
    results = await asyncio.gather(*coro)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(process_in_chunks('ABC'))
Run Code Online (Sandbox Code Playgroud)

尽管这意味着gen在处理开始之前生成器已耗尽。我知道它为什么会发生,但是如何改变呢?

Vin*_*ent 5

如果你不介意有外部依赖,你可以使用aiostream.stream.map

from aiostream import stream, pipe

async def process_in_chunks(sequence):
    # Asynchronous sequence of chunks
    xs = stream.iterate(split(sequence))
    # Asynchronous sequence of results
    ys = xs | pipe.map(process_chunk, task_limit=2)
    # Aggregation of the results into a list
    zs = ys | pipe.list()
    # Run the stream
    results = await zs
    print(results)
Run Code Online (Sandbox Code Playgroud)

块是懒惰地生成并提供给process_chunk协程。并发运行的协程数量由 控制task_limit。这意味着process_chunk不再需要信号量 in 。

输出:

Getting the next chunk: A
Processing chunk: A
Getting the next chunk: B
Processing chunk: B
# Pause 3 seconds
Getting the next chunk: C
Processing chunk: C
Finished chunking
# Pause 3 seconds
['OK', 'OK', 'OK']
Run Code Online (Sandbox Code Playgroud)

请参阅此演示文档中的更多示例。