等待Python异步生成器

mir*_*e2k 3 python-3.x python-asyncio

假设我有两个异步生成器:

async def get_rules():
    while True:
        yield 'rule=1'
        asyncio.sleep(2)


async def get_snapshots():
    while True:
        yield 'snapshot=1'
        asyncio.sleep(5)
Run Code Online (Sandbox Code Playgroud)

我想将它们合并到一个异步生成器中,该生成器返回2元组,其中包含两个元组的最新值.排序combineLatest.

做这个的最好方式是什么?

Vin*_*ent 6

您可能想要查看aiostream,尤其是stream.mergestream.accumulate:

import asyncio
from itertools import count
from aiostream import stream


async def get_rules():
    for x in count():
        await asyncio.sleep(2)
        yield 'rule', x


async def get_snapshots():
    for x in count():
        await asyncio.sleep(5)
        yield 'snapshot', x


async def main():
    xs = stream.merge(get_rules(), get_snapshots())
    ys = stream.map(xs, lambda x: {x[0]: x[1]})
    zs = stream.accumulate(ys, lambda x, e: {**x, **e}, {})

    async with zs.stream() as streamer:
        async for z in streamer:
            print(z)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Run Code Online (Sandbox Code Playgroud)

输出:

{}
{'rule': 0}
{'rule': 1}
{'rule': 1, 'snapshot': 0}
{'rule': 2, 'snapshot': 0}
[...]
Run Code Online (Sandbox Code Playgroud)

有关详细信息,请参阅项目页面文档.

免责声明:我是项目维护者.


mir*_*e2k 1

我想出了这个:

async def combine(**generators):
    """Given a bunch of async generators, merges the events from
    all of them. Each should have a name, i.e. `foo=gen, bar=gen`.
    """
    combined = Channel()
    async def listen_and_forward(name, generator):
        async for value in generator:
            await combined.put({name: value})
    for name, generator in generators.items():
        asyncio.Task(listen_and_forward(name, generator))

    async for item in combined:
        yield item


async def combine_latest(**generators):
    """Like "combine", but always includes the latest value from
    every generator.
    """
    current = {}
    async for value in combine(**generators):
        current.update(value)
        yield current
Run Code Online (Sandbox Code Playgroud)

像这样称呼它:

async for item in combine_latest(rules=rulesgen, snap=snapgen):
    print(item)
Run Code Online (Sandbox Code Playgroud)

输出看起来像这样:

{'rules': 'rule-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
....
Run Code Online (Sandbox Code Playgroud)

我正在使用 aiochannel,但普通的 asyncio.Queue 也应该没问题。