vau*_*tah 2 python python-asyncio
我正在使用 Python 的 asyncio 模块和async/await并发处理块中的字符序列并将结果收集在列表中。为此,我使用了块功能 ( split) 和块处理功能 ( process_chunk)。它们都来自第三方库,我不想更改它们。
分块很慢,并且事先不知道块的数量,这就是为什么我不想一次消耗整个块生成器。理想情况下,代码应该与process_chunk的信号量同步推进生成器,即每次函数返回时。
我的代码
import asyncio
def split(sequence):
for x in sequence:
print('Getting the next chunk:', x)
yield x
print('Finished chunking')
async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
async with semaphore:
print('Processing chunk:', chunk)
await asyncio.sleep(3)
return 'OK'
async def process_in_chunks(sequence):
gen = split(sequence)
coro = [process_chunk(chunk) for chunk in gen]
results = await asyncio.gather(*coro)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(process_in_chunks('ABC'))
Run Code Online (Sandbox Code Playgroud)
作品和版画种类
import asyncio
def split(sequence):
for x in sequence:
print('Getting the next chunk:', x)
yield x
print('Finished chunking')
async def process_chunk(chunk, *, semaphore=asyncio.Semaphore(2)):
async with semaphore:
print('Processing chunk:', chunk)
await asyncio.sleep(3)
return 'OK'
async def process_in_chunks(sequence):
gen = split(sequence)
coro = [process_chunk(chunk) for chunk in gen]
results = await asyncio.gather(*coro)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(process_in_chunks('ABC'))
Run Code Online (Sandbox Code Playgroud)
尽管这意味着gen在处理开始之前生成器已耗尽。我知道它为什么会发生,但是如何改变呢?
如果你不介意有外部依赖,你可以使用aiostream.stream.map:
from aiostream import stream, pipe
async def process_in_chunks(sequence):
# Asynchronous sequence of chunks
xs = stream.iterate(split(sequence))
# Asynchronous sequence of results
ys = xs | pipe.map(process_chunk, task_limit=2)
# Aggregation of the results into a list
zs = ys | pipe.list()
# Run the stream
results = await zs
print(results)
Run Code Online (Sandbox Code Playgroud)
块是懒惰地生成并提供给process_chunk协程。并发运行的协程数量由 控制task_limit。这意味着process_chunk不再需要信号量 in 。
输出:
Getting the next chunk: A
Processing chunk: A
Getting the next chunk: B
Processing chunk: B
# Pause 3 seconds
Getting the next chunk: C
Processing chunk: C
Finished chunking
# Pause 3 seconds
['OK', 'OK', 'OK']
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1611 次 |
| 最近记录: |