共享的python生成器

Jon*_*rin 19 python generator python-asyncio

我正在尝试使用Python生成器重现反应式扩展“共享”的可观察概念。

假设我有一个API,可以让我像这样使用无限流:

def my_generator():
    for elem in the_infinite_stream():
        yield elem
Run Code Online (Sandbox Code Playgroud)

我可以多次使用此生成器,如下所示:

stream1 = my_generator()
stream2 = my_generator()
Run Code Online (Sandbox Code Playgroud)

并且the_infinite_stream()将被调用两次(每个生成器一次)。

现在说这the_infinite_stream()是一项昂贵的手术。有没有办法在多个客户端之间“共享”生成器?似乎tee可以做到,但是我必须提前知道我想要多少个独立的生成器。

这个想法是,在其他使用反应性扩展(RxJava,RxSwift)“共享”流的语言(Java,Swift)中,我可以方便地在客户端上复制该流。我想知道如何在Python中做到这一点。

注意:我正在使用asyncio

san*_*ash 9

我接受了tee实现并对其进行了修改,这样您可以从中使用各种生成器infinite_stream

import collections

def generators_factory(iterable):
    it = iter(iterable)
    deques = []
    already_gone = []

    def new_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        def gen(mydeque):
            while True:
                if not mydeque:             # when the local deque is empty
                    newval = next(it)       # fetch a new value and
                    already_gone.append(newval)
                    for d in deques:        # load it to all the deques
                        d.append(newval)
                yield mydeque.popleft()

        return gen(new_deque)

    return new_generator

# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1
Run Code Online (Sandbox Code Playgroud)

要仅缓存一些值,您可以更改already_gone = []already_gone = collections.deque(maxlen=size)并将size=None参数添加到generators_factory