通过回调将函数转换为Python生成器?

mar*_*ius 30 python generator coroutine

Scipy最小化函数(仅用作示例),可以选择在每一步添加回调函数.所以我可以做点什么,

def my_callback(x):
    print x
scipy.optimize.fmin(func, x0, callback=my_callback)
Run Code Online (Sandbox Code Playgroud)

有没有办法使用回调函数来创建fmin的生成器版本,这样我才能做到,

for x in my_fmin(func,x0):
    print x
Run Code Online (Sandbox Code Playgroud)

似乎可能有一些产量和发送的组合,但我可以想到任何事情.

mgi*_*nbr 16

正如评论中所指出的,你可以在一个新的线程中使用Queue.缺点是您仍然需要某种方式来访问最终结果(最后fmin返回的结果).我下面的例子使用一个可选的回调来做一些事情(另一种选择就是产生它,尽管你的调用代码必须区分迭代结果和最终结果):

from thread import start_new_thread
from Queue import Queue

def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):

    q = Queue() # fmin produces, the generator consumes
    job_done = object() # signals the processing is done

    # Producer
    def my_callback(x):
        q.put(x)
    def task():
        ret = scipy.optimize.fmin(func,x0,callback=my_callback)
        q.put(job_done)
        end_callback(ret) # "Returns" the result of the main call

    # Starts fmin in a new thread
    start_new_thread(task,())

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item
Run Code Online (Sandbox Code Playgroud)

更新:为了阻止下一次迭代的执行,直到消费者处理完最后一次迭代,还需要使用task_donejoin.

    # Producer
    def my_callback(x):
        q.put(x)
        q.join() # Blocks until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        if next_item is job_done:
            break
        yield next_item
        q.task_done() # Unblocks the producer, so a new iteration can start
Run Code Online (Sandbox Code Playgroud)

请注意,这maxsize=1不是必需的,因为在消耗最后一个项目之前,不会将新项目添加到队列中.

更新2:另请注意,除非此生成器最终检索到所有项目,否则创建的线程将死锁(它将永久阻塞并且其资源永远不会被释放).生产者正在等待队列,因为它存储了对该队列的引用,即使消费者是gc,它也永远不会被gc回收.然后队列将无法访问,因此没有人能够释放锁.

如果可能的话,一个干净的解决方案是未知的(因为它将取决于所使用的特定功能fmin).可以使用一种解决方法timeout,如果生成器put长时间引发一个异常:

    q = Queue(maxsize=1)

    # Producer
    def my_callback(x):
        q.put(x)
        q.put("dummy",True,timeout) # Blocks until the first result is retrieved
        q.join() # Blocks again until task_done is called

    # Consumer
    while True:
        next_item = q.get(True,timeout) # Blocks until an input is available
        q.task_done()                   # (one "task_done" per "get")
        if next_item is job_done:
            break
        yield next_item
        q.get() # Retrieves the "dummy" object (must be after yield)
        q.task_done() # Unblocks the producer, so a new iteration can start
Run Code Online (Sandbox Code Playgroud)

  • 非常好.要解决@ brice的问题,请使用`Queue(maxsize = 1)`和`q.put(x,block = True)`.否则我看不出任何问题. (2认同)

Jan*_*sky 7

生成器为协程(无线程)

让我们FakeFtpretrbinary使用回调函数被调用数据的程序块的每成功读取:

class FakeFtp(object):
    def __init__(self):
        self.data = iter(["aaa", "bbb", "ccc", "ddd"])

    def login(self, user, password):
        self.user = user
        self.password = password

    def retrbinary(self, cmd, cb):
        for chunk in self.data:
            cb(chunk)
Run Code Online (Sandbox Code Playgroud)

使用简单的回调函数有一个缺点,即它被重复调用,并且回调函数无法轻松地保持两次调用之间的上下文。

以下代码定义了process_chunks生成器,它将能够一个接一个地接收数据块并对其进行处理。与简单的回调不同,在这里我们能够将所有处理保持在一个函数内而不会丢失上下文。

from contextlib import closing
from itertools import count


def main():
    processed = []

    def process_chunks():
        for i in count():
            try:
                # (repeatedly) get the chunk to process
                chunk = yield
            except GeneratorExit:
                # finish_up
                print("Finishing up.")
                return
            else:
                # Here process the chunk as you like
                print("inside coroutine, processing chunk:", i, chunk)
                product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
                processed.append(product)

    with closing(process_chunks()) as coroutine:
        # Get the coroutine to the first yield
        coroutine.next()
        ftp = FakeFtp()
        # next line repeatedly calls `coroutine.send(data)`
        ftp.retrbinary("RETR binary", cb=coroutine.send)
        # each callback "jumps" to `yield` line in `process_chunks`

    print("processed result", processed)
    print("DONE")
Run Code Online (Sandbox Code Playgroud)

要查看运行中的代码,请放置FakeFtp类,上面和下面的代码所示的行:

main()
Run Code Online (Sandbox Code Playgroud)

到一个文件中并调用它:

$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE
Run Code Online (Sandbox Code Playgroud)

怎么运行的

processed = []此处仅显示,生成器process_chunks与它的外部环境配合时将没有任何问题。全部包裹起来def main():即可证明,不需要使用全局变量。

def process_chunks()是解决方案的核心。它可能有一个shot输入参数(此处未使用),但要点是,它接收输入的是每yield行将任何人通过其返回的内容返回.send(data)到此生成器的实例。coroutine.send(chunk)但是可以在此示例中通过引用此函数的回调来完成callback.send

注意,在实际解决方案yield中,在代码中包含多个s 是没有问题的,它们是一个接一个地处理的。例如,可以使用它来读取(和忽略)CSV文件的标头,然后继续处理带有数据的记录。

我们可以实例化并使用生成器,如下所示:

coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()

ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`

# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()
Run Code Online (Sandbox Code Playgroud)

真正的代码是使用contextlib closing上下文管理器确保coroutine.close()始终被调用的。

结论

此解决方案未提供某种迭代器来使用传统样式“来自外部”的数据。另一方面,我们能够:

  • 从内部使用发电机
  • 将所有迭代处理保持在一个函数内,而不会在回调之间被中断
  • 可以选择使用外部上下文
  • 向外界提供有用的结果
  • 所有这些都可以在不使用线程的情况下完成

鸣谢:该解决方案的主要灵感来自于user2357112编写的Python FTP“块”迭代器(无需将整个文件加载到内存)

  • @JanVlcinsky 哦,是的,我明白了。所以我认为这个答案的目的主要是为了生成器的使用,而不是为了回答提问者的问题:`for x in my_fmin(func,x0): print x`。毕竟,如果我们将“processed”列表的数据写入文件或其他流,我们无法通过如上所示的“for”循环来迭代它。尽管如此,这仍然是一个很好的答案。 (2认同)

bri*_*ice 6

概念使用阻塞队列maxsize=1和生产者/消费者模型.

回调产生,然后对回调的下一次调用将阻塞整个队列.

然后,使用者从队列中获取值,尝试获取另一个值,并在读取时阻塞.

生产者被允许推入队列,冲洗并重复.

用法:

def dummy(func, arg, callback=None):
  for i in range(100):
    callback(func(arg+i))

# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
  print(i)

# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
   print(i)
Run Code Online (Sandbox Code Playgroud)

可以按预期用于迭代器:

for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
  print(i)
Run Code Online (Sandbox Code Playgroud)

迭代课:

from thread import start_new_thread
from Queue import Queue

class Iteratorize:
  """ 
  Transforms a function that takes a callback 
  into a lazy iterator (generator).
  """
  def __init__(self, func, ifunc, arg, callback=None):
    self.mfunc=func
    self.ifunc=ifunc
    self.c_callback=callback
    self.q = Queue(maxsize=1)
    self.stored_arg=arg
    self.sentinel = object()

    def _callback(val):
      self.q.put(val)

    def gentask():
      ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
      self.q.put(self.sentinel)
      if self.c_callback:
        self.c_callback(ret)

    start_new_thread(gentask, ())

  def __iter__(self):
    return self

  def next(self):
    obj = self.q.get(True,None)
    if obj is self.sentinel:
     raise StopIteration 
    else:
      return obj
Run Code Online (Sandbox Code Playgroud)

可能可以做一些清理接受*args**kwargs被包装的功能和/或最终结果回调.