Python生成器的多个客户端?

moo*_*eep 2 python generator coroutine

作为这个问题的后续,我试图绕过range(int(1e8))使用生成器的例子列表xrange(int(1e8)).其中xrange只是一个产生长序列值的过程的例子.(请假设它不能轻易复制.)还有一些背景知识,我有一长串的时间戳/值对,我想对它进行一些处理(有时间序列).我试图避免将这些内容整体记录到内存中,因为这是很多数据.

我认为如果我可以将多个处理单元同时应用于我的生成器生成的数据流,那将会很酷.第一个想法是使用itertools.tee(),例如:

from itertools import tee
g1,g2 = tee(xrange(int(1e8)),2)
sum(g1), sum(g2)
Run Code Online (Sandbox Code Playgroud)

但后来我发现只有第一个sum()会使用生成器,而tee()内部list再次构建一个(我想避免它).

所以我想,我需要一个异步解决方案,即允许每个sum()都在每个生成器步骤进行更新的解决方案.想到的东西在哪里

但是我之前没有真正使用过,部分我甚至无法判断这些方法是否有效,或者是有效/高效/高效的.

从这一点来说,我很乐意感谢观众的任何建议!


更新

我想避免使用基于回调的解决方案,因为它显着地降低了性能(这是它当前实现的方式).我在下面添加了一些分析(如果测试不客观,请添加注释):

class SinkA:
  def __init__(self, src):
    for i in src: pass

class SinkB:
  def f(self,i):
    pass

class Source:
  def __iter__(self):
    for i in xrange(int(1e4)):
      yield i

def t1():
  src = Source()
  snk = SinkA(src)

def t2():
  src = Source()
  snk = SinkB()
  for i in src: snk.f(i)

if __name__ == "__main__":
    from timeit import Timer
    n = 1000
    t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass
    t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass
Run Code Online (Sandbox Code Playgroud)

更新2

我还能说什么呢?我有这个基于回调的解决方案,看起来效率很低.基于生成器的方法看起来很有前途,但我对这种编程的经验太少,特别是当它涉及更复杂的事物如协同程序或扭曲的库时.总而言之,我有一个生成大量数据的流程的多个消费者,我发现了一些潜在的方法.现在我正在寻找有经验的用户可能已经完成类似任务的合格声明.处理哪种方法可能合适的方式,方法如何相互关联.或者我可能错过了其他什么方法.

ber*_*eal 6

作为一种通用的方法,我会用回调替换生成器的拉模型,并且可能包装生成器,如下所示:

def walk(gen, callbacks):
    for item in gen:
        for f in callbacks:
            f(item)
Run Code Online (Sandbox Code Playgroud)

如果您的处理器位于不同的线程中并且您希望它们在等待时阻塞,则可以注册Queue.put(或任何等效的)作为每个处理器的回调,并独立轮询这些队列.如果需要,这将允许您使用广播和工作池模型.

编辑

另一个解决方案是使用协同程序:

def source(self, *dests):
    for i in xrange(int(1e4)):
        for dest in dests:
            dest.send(i)

def sink():
    while True:
        i = yield

def t3():
    snk = sink()
    snk.next() # activate the coroutine
    source(snk)

if __name__ == '__main__':

    from timeit import Timer
    n = 1000
    t = Timer("t3()", "from __main__ import source, sink, t3")
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 872.99 usec/pass
Run Code Online (Sandbox Code Playgroud)

看起来足够快.基本上,协程是倒置的发生器,你从发电机拉,推到协程.