在Python中加入多个异步生成器

And*_*kin 3 python python-asyncio

我想侦听来自同一对象的多个实例的事件,然后将此事件流合并为一个流。例如,如果我使用异步生成器:

class PeriodicYielder: 
    def __init__(self, period: int) -> None: 
        self.period = period 

    async def updates(self): 
        while True: 
            await asyncio.sleep(self.period)
            yield self.period
Run Code Online (Sandbox Code Playgroud)

我可以成功监听一个实例的事件:

async def get_updates_from_one(): 
    each_1 = PeriodicYielder(1) 
    async for n in each_1.updates(): 
        print(n)
# 1
# 1
# 1
# ...
Run Code Online (Sandbox Code Playgroud)

但是,如何从多个异步生成器获取事件?换句话说:如何按多个异步生成器准备好产生下一个值的顺序进行迭代?

async def get_updates_from_multiple(): 
    each_1 = PeriodicYielder(1) 
    each_2 = PeriodicYielder(2) 
    async for n in magic_async_join_function(each_1.updates(), each_2.updates()): 
        print(n)
# 1
# 1
# 2
# 1
# 1
# 2
# ...
Run Code Online (Sandbox Code Playgroud)

stdlib或3rd party模块中是否有这种magic_async_join_function

Mik*_*mov 6

您可以使用精彩的aiostream库。它看起来像这样:

import asyncio
from aiostream import stream


async def test1():
    for _ in range(5):
        await asyncio.sleep(0.1)
        yield 1


async def test2():
    for _ in range(5):
        await asyncio.sleep(0.2)
        yield 2


async def main():
    combine = stream.merge(test1(), test2())

    async with combine.stream() as streamer:
        async for item in streamer:
            print(item)


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

结果:

1
1
2
1
1
2
1
2
2
2
Run Code Online (Sandbox Code Playgroud)


use*_*342 5

如果您想避免对外部库的依赖(或作为学习练习),则可以使用队列来合并异步迭代器:

def merge_async_iters(*aiters):
    # merge async iterators, proof of concept
    queue = asyncio.Queue(1)
    async def drain(aiter):
        async for item in aiter:
            await queue.put(item)
    async def merged():
        while not all(task.done() for task in tasks):
            yield await queue.get()
    tasks = [asyncio.create_task(drain(aiter)) for aiter in aiters]
    return merged()
Run Code Online (Sandbox Code Playgroud)

这通过了Mikhail的答案的测试,但这并不完美:如果异步迭代器之一引发异常,它不会传播异常。同样,如果用尽所有merged返回的生成器的任务merge_async_iters()被取消,或者同一生成器没有用尽,则各个drain任务将被挂起。

较完整的版本可以通过检测异常并将其通过队列传输来处理第一个问题。第二个问题可以通过在放弃迭代后立即通过merged生成器取消drain任务来解决。进行这些更改后,结果代码如下所示:

def merge_async_iters(*aiters):
    queue = asyncio.Queue(1)
    run_count = len(aiters)
    cancelling = False

    async def drain(aiter):
        nonlocal run_count
        try:
            async for item in aiter:
                await queue.put((False, item))
        except Exception as e:
            if not cancelling:
                await queue.put((True, e))
            else:
                raise
        finally:
            run_count -= 1

    async def merged():
        try:
            while run_count:
                raised, next_item = await queue.get()
                if raised:
                    cancel_tasks()
                    raise next_item
                yield next_item
        finally:
            cancel_tasks()

    def cancel_tasks():
        nonlocal cancelling
        cancelling = True
        for t in tasks:
            t.cancel()

    tasks = [asyncio.create_task(drain(aiter)) for aiter in aiters]
    return merged()
Run Code Online (Sandbox Code Playgroud)

不同的方法来合并异步迭代中可以找到这个答案,而且这一次,在后来允许在步幅中旬增加新流。这些实现的复杂性和微妙性表明,虽然知道如何编写非常有用,但实际上最好由经过充分测试的外部库(例如涵盖所有极端情况的aiostream)保留