mar*_*ius 30 python generator coroutine
Scipy最小化函数(仅用作示例),可以选择在每一步添加回调函数.所以我可以做点什么,
def my_callback(x):
print x
scipy.optimize.fmin(func, x0, callback=my_callback)
Run Code Online (Sandbox Code Playgroud)
有没有办法使用回调函数来创建fmin的生成器版本,这样我才能做到,
for x in my_fmin(func,x0):
print x
Run Code Online (Sandbox Code Playgroud)
似乎可能有一些产量和发送的组合,但我可以想到任何事情.
mgi*_*nbr 16
正如评论中所指出的,你可以在一个新的线程中使用Queue.缺点是您仍然需要某种方式来访问最终结果(最后fmin返回的结果).我下面的例子使用一个可选的回调来做一些事情(另一种选择就是产生它,尽管你的调用代码必须区分迭代结果和最终结果):
from thread import start_new_thread
from Queue import Queue
def my_fmin(func, x0, end_callback=(lambda x:x), timeout=None):
q = Queue() # fmin produces, the generator consumes
job_done = object() # signals the processing is done
# Producer
def my_callback(x):
q.put(x)
def task():
ret = scipy.optimize.fmin(func,x0,callback=my_callback)
q.put(job_done)
end_callback(ret) # "Returns" the result of the main call
# Starts fmin in a new thread
start_new_thread(task,())
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
Run Code Online (Sandbox Code Playgroud)
更新:为了阻止下一次迭代的执行,直到消费者处理完最后一次迭代,还需要使用task_done和join.
# Producer
def my_callback(x):
q.put(x)
q.join() # Blocks until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
if next_item is job_done:
break
yield next_item
q.task_done() # Unblocks the producer, so a new iteration can start
Run Code Online (Sandbox Code Playgroud)
请注意,这maxsize=1不是必需的,因为在消耗最后一个项目之前,不会将新项目添加到队列中.
更新2:另请注意,除非此生成器最终检索到所有项目,否则创建的线程将死锁(它将永久阻塞并且其资源永远不会被释放).生产者正在等待队列,因为它存储了对该队列的引用,即使消费者是gc,它也永远不会被gc回收.然后队列将无法访问,因此没有人能够释放锁.
如果可能的话,一个干净的解决方案是未知的(因为它将取决于所使用的特定功能fmin).可以使用一种解决方法timeout,如果生成器put长时间引发一个异常:
q = Queue(maxsize=1)
# Producer
def my_callback(x):
q.put(x)
q.put("dummy",True,timeout) # Blocks until the first result is retrieved
q.join() # Blocks again until task_done is called
# Consumer
while True:
next_item = q.get(True,timeout) # Blocks until an input is available
q.task_done() # (one "task_done" per "get")
if next_item is job_done:
break
yield next_item
q.get() # Retrieves the "dummy" object (must be after yield)
q.task_done() # Unblocks the producer, so a new iteration can start
Run Code Online (Sandbox Code Playgroud)
让我们FakeFtp与retrbinary使用回调函数被调用数据的程序块的每成功读取:
class FakeFtp(object):
def __init__(self):
self.data = iter(["aaa", "bbb", "ccc", "ddd"])
def login(self, user, password):
self.user = user
self.password = password
def retrbinary(self, cmd, cb):
for chunk in self.data:
cb(chunk)
Run Code Online (Sandbox Code Playgroud)
使用简单的回调函数有一个缺点,即它被重复调用,并且回调函数无法轻松地保持两次调用之间的上下文。
以下代码定义了process_chunks生成器,它将能够一个接一个地接收数据块并对其进行处理。与简单的回调不同,在这里我们能够将所有处理保持在一个函数内而不会丢失上下文。
from contextlib import closing
from itertools import count
def main():
processed = []
def process_chunks():
for i in count():
try:
# (repeatedly) get the chunk to process
chunk = yield
except GeneratorExit:
# finish_up
print("Finishing up.")
return
else:
# Here process the chunk as you like
print("inside coroutine, processing chunk:", i, chunk)
product = "processed({i}): {chunk}".format(i=i, chunk=chunk)
processed.append(product)
with closing(process_chunks()) as coroutine:
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
print("processed result", processed)
print("DONE")
Run Code Online (Sandbox Code Playgroud)
要查看运行中的代码,请放置FakeFtp类,上面和下面的代码所示的行:
main()
Run Code Online (Sandbox Code Playgroud)
到一个文件中并调用它:
$ python headsandtails.py
('inside coroutine, processing chunk:', 0, 'aaa')
('inside coroutine, processing chunk:', 1, 'bbb')
('inside coroutine, processing chunk:', 2, 'ccc')
('inside coroutine, processing chunk:', 3, 'ddd')
Finishing up.
('processed result', ['processed(0): aaa', 'processed(1): bbb', 'processed(2): ccc', 'processed(3): ddd'])
DONE
Run Code Online (Sandbox Code Playgroud)
processed = []此处仅显示,生成器process_chunks与它的外部环境配合时将没有任何问题。全部包裹起来def main():即可证明,不需要使用全局变量。
def process_chunks()是解决方案的核心。它可能有一个shot输入参数(此处未使用),但要点是,它接收输入的是每yield行将任何人通过其返回的内容返回.send(data)到此生成器的实例。coroutine.send(chunk)但是可以在此示例中通过引用此函数的回调来完成callback.send。
注意,在实际解决方案yield中,在代码中包含多个s 是没有问题的,它们是一个接一个地处理的。例如,可以使用它来读取(和忽略)CSV文件的标头,然后继续处理带有数据的记录。
我们可以实例化并使用生成器,如下所示:
coroutine = process_chunks()
# Get the coroutine to the first yield
coroutine.next()
ftp = FakeFtp()
# next line repeatedly calls `coroutine.send(data)`
ftp.retrbinary("RETR binary", cb=coroutine.send)
# each callback "jumps" to `yield` line in `process_chunks`
# close the coroutine (will throw the `GeneratorExit` exception into the
# `process_chunks` coroutine).
coroutine.close()
Run Code Online (Sandbox Code Playgroud)
真正的代码是使用contextlib closing上下文管理器确保coroutine.close()始终被调用的。
此解决方案未提供某种迭代器来使用传统样式“来自外部”的数据。另一方面,我们能够:
鸣谢:该解决方案的主要灵感来自于user2357112编写的Python FTP“块”迭代器(无需将整个文件加载到内存)
概念使用阻塞队列
maxsize=1和生产者/消费者模型.
回调产生,然后对回调的下一次调用将阻塞整个队列.
然后,使用者从队列中获取值,尝试获取另一个值,并在读取时阻塞.
生产者被允许推入队列,冲洗并重复.
用法:
def dummy(func, arg, callback=None):
for i in range(100):
callback(func(arg+i))
# Dummy example:
for i in Iteratorize(dummy, lambda x: x+1, 0):
print(i)
# example with scipy:
for i in Iteratorize(scipy.optimize.fmin, func, x0):
print(i)
Run Code Online (Sandbox Code Playgroud)
可以按预期用于迭代器:
for i in take(5, Iteratorize(dummy, lambda x: x+1, 0)):
print(i)
Run Code Online (Sandbox Code Playgroud)
迭代课:
from thread import start_new_thread
from Queue import Queue
class Iteratorize:
"""
Transforms a function that takes a callback
into a lazy iterator (generator).
"""
def __init__(self, func, ifunc, arg, callback=None):
self.mfunc=func
self.ifunc=ifunc
self.c_callback=callback
self.q = Queue(maxsize=1)
self.stored_arg=arg
self.sentinel = object()
def _callback(val):
self.q.put(val)
def gentask():
ret = self.mfunc(self.ifunc, self.stored_arg, callback=_callback)
self.q.put(self.sentinel)
if self.c_callback:
self.c_callback(ret)
start_new_thread(gentask, ())
def __iter__(self):
return self
def next(self):
obj = self.q.get(True,None)
if obj is self.sentinel:
raise StopIteration
else:
return obj
Run Code Online (Sandbox Code Playgroud)
可能可以做一些清理接受*args和**kwargs被包装的功能和/或最终结果回调.
| 归档时间: |
|
| 查看次数: |
5716 次 |
| 最近记录: |