异步python itertools链多个生成器

Ard*_*dhi 2 python asynchronous sequence-generators python-3.x python-asyncio

有关更新的问题:

假设我有2个处理生成器函数:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6
Run Code Online (Sandbox Code Playgroud)

我可以用itertools链接它们

from itertools import chain

mix = chain(gen1(), gen2())
Run Code Online (Sandbox Code Playgroud)

然后我可以用它创建另一个生成器函数对象,

def mix_yield():
   for item in mix:
      yield item
Run Code Online (Sandbox Code Playgroud)

或者只是如果我只是想next(mix),它就在那里。

我的问题是,我该如何做异步代码中的等效代码?

因为我需要它来:

  • 返回收益率(一个接一个),或使用next迭代器
  • 最快的解决方案收益率最高(异步)

上一页 更新:

经过实验和研究后,我发现aiostream库声明为itertools的异步版本,因此我做了什么:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item
Run Code Online (Sandbox Code Playgroud)

但我还是做不到 next(a_mix)

TypeError: 'merge' object is not an iterator
Run Code Online (Sandbox Code Playgroud)

要么 next(await a_mix)

raise StreamEmpty()
Run Code Online (Sandbox Code Playgroud)

尽管我仍然可以将其列入列表:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]
Run Code Online (Sandbox Code Playgroud)

所以一个目标完成了,还有一个目标:

  • 返回收益率(一个接一个),或使用next迭代器

    -首先以最快的速度解决问题(异步)

use*_*342 6

Python的next内置函数只是__next__在对象上调用基础方法的便捷方法。异步等效项__next____anext__异步迭代器上的方法。没有anext全局函数,但是可以很容易地编写它:

async def anext(aiterator):
    return await aiterator.__anext__()
Run Code Online (Sandbox Code Playgroud)

但是节省下来的钱很小,以至于在极少数情况下需要时,最好__anext__直接调用。异步迭代器又可以通过调用(类似于常规可迭代对象提供的)从异步可迭代对象获得。手动驱动的异步迭代如下所示:__aiter____iter__

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...
Run Code Online (Sandbox Code Playgroud)

__anext__StopAsyncIteration当没有更多元素可用时将增加。要遍历异步迭代器,应使用async for

这是一个基于您的代码的可运行示例,同时使用__anext__async for来耗尽用设置的流aiostream.stream.combine.merge

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

  • 只是提到在 aiostream 中输入流上下文意味着使用 `async with zs.stream() as streamer:`,如本 [演示](https://aiostream.readthedocs.io/en/latest/演示文稿.html#演示)。 (2认同)