管道多个消费者的迭代器?

Fra*_*ila 8 python

是否有可能跨越多个消费者"管道"生成器的消耗?

例如,使用此模式的代码很常见:

def consumer1(iterator):
    for item in iterator:
        foo(item)

def consumer2(iterator):
    for item in iterator:
        bar(item)

myiter = list(big_generator())
v1 = consumer1(myiter)
v2 = consumer2(myiter)
Run Code Online (Sandbox Code Playgroud)

在这种情况下,多个函数完全使用相同的迭代器,因此必须将迭代器缓存在列表中.由于每个消费者都耗尽了迭代器,itertools.tee因此没用.

我看到这样的代码很多,我总是希望我能让消费者按顺序一次消耗一个项目,而不是缓存整个迭代器.例如:

  1. consumer1 消耗 myiter[0]
  2. consumer2 消耗 myiter[0]
  3. consumer1 消耗 myiter[1]
  4. consumer2 消耗 myiter[1]
  5. 等等...

如果我要编写语法,它看起来像这样:

c1_retval, c2_retval = iforkjoin(big_generator(), (consumer1, consumer2))
Run Code Online (Sandbox Code Playgroud)

您可以通过线程或多处理和teed迭代器来接近,但线程以不同的速度消耗,这意味着内部缓存的值deque tee可能会变得非常大.这里的要点不是利用并行性或加速任务,而是避免缓存迭代器的大部分.

在我看来,如果不修改消费者,这可能是不可能的,因为控制流程在消费者中.但是,当一个消费者实际使用迭代器控件传递给迭代器的next()方法时,也许有可能以某种方式反转控制流,以便迭代器一次阻塞一个消费者,直到它可以全部提供它们?

如果这是可能的,我不够聪明,不知道如何.有任何想法吗?

shx*_*hx2 1

由于不更改消费者代码(即其中有循环)的限制,您只剩下两个选择:

  1. 您已经在问题中包含的方法:将生成的项目缓存在内存中,然后多次迭代它们。
  2. 在一个线程中运行每个消费者,并实现某种同步itertools.tee,其缓冲区大小为1,它会阻止服务项目,i+1直到项目i已提供给所有消费者。

没有其他选择。您无法实现以下所有目标,因为它们是相互矛盾的:

  1. 有发电机
  2. 有一个循环来消耗所有它
  3. 然后,(连续地)在前一个循环完成,有另一个循环再次消耗所有内容
  4. 在使用它们时仅将 O(1) 项保留在内存(或磁盘等)中
  5. 不重新生成(即不重新创建生成器)

如果您想重复使用生成的项目,则必须将其存储在某个地方。

如果更改消费者的代码是可以接受的,那么显然 @monkey 的解决方案是最简单、最直接的。