Jon*_*rin 19 python generator python-asyncio
我正在尝试使用Python生成器重现反应式扩展“共享”的可观察概念。
假设我有一个API,可以让我像这样使用无限流:
def my_generator():
for elem in the_infinite_stream():
yield elem
Run Code Online (Sandbox Code Playgroud)
我可以多次使用此生成器,如下所示:
stream1 = my_generator()
stream2 = my_generator()
Run Code Online (Sandbox Code Playgroud)
并且the_infinite_stream()将被调用两次(每个生成器一次)。
现在说这the_infinite_stream()是一项昂贵的手术。有没有办法在多个客户端之间“共享”生成器?似乎tee可以做到,但是我必须提前知道我想要多少个独立的生成器。
这个想法是,在其他使用反应性扩展(RxJava,RxSwift)“共享”流的语言(Java,Swift)中,我可以方便地在客户端上复制该流。我想知道如何在Python中做到这一点。
注意:我正在使用asyncio
我接受了tee实现并对其进行了修改,这样您可以从中使用各种生成器infinite_stream:
import collections
def generators_factory(iterable):
it = iter(iterable)
deques = []
already_gone = []
def new_generator():
new_deque = collections.deque()
new_deque.extend(already_gone)
deques.append(new_deque)
def gen(mydeque):
while True:
if not mydeque: # when the local deque is empty
newval = next(it) # fetch a new value and
already_gone.append(newval)
for d in deques: # load it to all the deques
d.append(newval)
yield mydeque.popleft()
return gen(new_deque)
return new_generator
# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1
Run Code Online (Sandbox Code Playgroud)
要仅缓存一些值,您可以更改already_gone = []为already_gone = collections.deque(maxlen=size)并将size=None参数添加到generators_factory。
| 归档时间: |
|
| 查看次数: |
637 次 |
| 最近记录: |