mir*_*e2k 3 python-3.x python-asyncio
假设我有两个异步生成器:
async def get_rules():
while True:
yield 'rule=1'
asyncio.sleep(2)
async def get_snapshots():
while True:
yield 'snapshot=1'
asyncio.sleep(5)
Run Code Online (Sandbox Code Playgroud)
我想将它们合并到一个异步生成器中,该生成器返回2元组,其中包含两个元组的最新值.排序combineLatest.
做这个的最好方式是什么?
您可能想要查看aiostream,尤其是stream.merge和stream.accumulate:
import asyncio
from itertools import count
from aiostream import stream
async def get_rules():
for x in count():
await asyncio.sleep(2)
yield 'rule', x
async def get_snapshots():
for x in count():
await asyncio.sleep(5)
yield 'snapshot', x
async def main():
xs = stream.merge(get_rules(), get_snapshots())
ys = stream.map(xs, lambda x: {x[0]: x[1]})
zs = stream.accumulate(ys, lambda x, e: {**x, **e}, {})
async with zs.stream() as streamer:
async for z in streamer:
print(z)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Run Code Online (Sandbox Code Playgroud)
输出:
{}
{'rule': 0}
{'rule': 1}
{'rule': 1, 'snapshot': 0}
{'rule': 2, 'snapshot': 0}
[...]
Run Code Online (Sandbox Code Playgroud)
免责声明:我是项目维护者.
我想出了这个:
async def combine(**generators):
"""Given a bunch of async generators, merges the events from
all of them. Each should have a name, i.e. `foo=gen, bar=gen`.
"""
combined = Channel()
async def listen_and_forward(name, generator):
async for value in generator:
await combined.put({name: value})
for name, generator in generators.items():
asyncio.Task(listen_and_forward(name, generator))
async for item in combined:
yield item
async def combine_latest(**generators):
"""Like "combine", but always includes the latest value from
every generator.
"""
current = {}
async for value in combine(**generators):
current.update(value)
yield current
Run Code Online (Sandbox Code Playgroud)
像这样称呼它:
async for item in combine_latest(rules=rulesgen, snap=snapgen):
print(item)
Run Code Online (Sandbox Code Playgroud)
输出看起来像这样:
{'rules': 'rule-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
{'rules': 'rule-1', 'snap': 'snapshot-1'}
....
Run Code Online (Sandbox Code Playgroud)
我正在使用 aiochannel,但普通的 asyncio.Queue 也应该没问题。
| 归档时间: |
|
| 查看次数: |
1576 次 |
| 最近记录: |