伙计们,我彻底搞糊涂了,所以我甚至不能正确地问问题,但是这里有:
我有一个使用inlineCallbacks的扭曲应用程序.现在我需要定义一个迭代器,这意味着生成器返回给调用者.但是,迭代器不能用inlineCallbacks装饰,可以吗?如果没有,那我该怎么做我这样的代码.
只是为了澄清:目标是process_loop需要每次调用,比方说5秒,它只能处理一大块,比方说10,然后它必须放手.但是,要知道10块(存储在缓存中,这是dict的dict),它需要调用一个返回延迟的函数.
@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
for cachename,cachevalue in cached.items():
result = yield (call func here which returns deferred)
if result is True:
for k,v in cachedvalue.items():
yield cachename, k, v
@inlineCallbacks
def process_chunk(myiter, num):
try:
for i in xrange(num):
nextval = myiter.next()
yield some_processing(nextval)
returnValue(False)
except StopIteration:
returnValue(True)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
result = yield process_chunk(myiter, 10)
if not result:
print 'More left'
reactor.callLater(5, process_loop, cached)
else:
print 'All done'
Run Code Online (Sandbox Code Playgroud)
Jea*_*one 12
你是不对的,你不能表达你想要表达的内容cacheiter.该inlineCallbacks装饰不会让你有一个返回一个迭代器的功能.如果用它装饰一个函数,那么结果就是一个总是返回一个函数的函数Deferred.这就是它的用途.
造成这种困难的部分原因是迭代器不能很好地处理异步代码.如果有一个Deferred涉及生成迭代器的元素,那么从迭代器中出来的元素将首先是Deferreds.
你可以做这样的事情来解释:
@inlineCallbacks
def process_work():
for element_deferred in some_jobs:
element = yield element_deferred
work_on(element)
Run Code Online (Sandbox Code Playgroud)
这可以工作,但看起来特别奇怪.由于生成器只能屈服于它们的调用者(例如,不是它们的调用者的调用者),因此some_jobs迭代器无法对此做任何事情; 只有词法内的代码process_work可以产生一个延迟到提供的inlineCallbacks蹦床等待.
如果您不介意这种模式,那么我们可以将您的代码映像为:
from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor
class cacheiter(object):
def __init__(self, cached):
self._cached = iter(cached.items())
self._remaining = []
def __iter__(self):
return self
@inlineCallbacks
def next(self):
# First re-fill the list of synchronously-producable values if it is empty
if not self._remaining:
for name, value in self._cached:
# Wait on this Deferred to determine if this cache item should be included
if (yield check_condition(name, value)):
# If so, put all of its values into the value cache so the next one
# can be returned immediately next time this method is called.
self._remaining.extend([(name, k, v) for (k, v) in value.items()])
# Now actually give out a value, if there is one.
if self._remaining:
returnValue(self._remaining.pop())
# Otherwise the entire cache has been visited and the iterator is complete.
# Sadly we cannot signal completion with StopIteration, because the iterator
# protocol isn't going to add an errback to this Deferred and check for
# StopIteration. So signal completion with a simple None value.
returnValue(None)
@inlineCallbacks
def process_chunk(myiter, num):
for i in xrange(num):
nextval = yield myiter.next()
if nextval is None:
# The iterator signaled completion via the special None value.
# Processing is complete.
returnValue(True)
# Otherwise process the value.
yield some_processing(nextval)
# Indicate there is more processing to be done.
returnValue(False)
def sleep(sec):
# Simple helper to delay asynchronously for some number of seconds.
return deferLater(reactor, sec, lambda: None)
@inlineCallbacks
def process_loop(cached):
myiter = cacheiter(cached)
while True:
# Loop processing 10 items from myiter at a time, until process_chunk signals
# there are no values left.
result = yield process_chunk(myiter, 10)
if result:
print 'All done'
break
print 'More left'
# Insert the 5 second delay before starting on the next chunk.
yield sleep(5)
d = process_loop(cached)
Run Code Online (Sandbox Code Playgroud)
但是,您可以采用的另一种方法是使用twisted.internet.task.cooperate. cooperate采用迭代器并消耗它,假设消耗它可能是昂贵的,并且在多个反应器迭代中分离工作.cacheiter从上面的定义:
from twisted.internet.task import cooperate
def process_loop(cached):
finished = []
def process_one(value):
if value is None:
finished.append(True)
else:
return some_processing(value)
myiter = cacheiter(cached)
while not finished:
value_deferred = myiter.next()
value_deferred.addCallback(process_one)
yield value_deferred
task = cooperate(process_loop(cached))
d = task.whenDone()
Run Code Online (Sandbox Code Playgroud)
zee*_*kay -1
尝试将迭代器编写为DeferredGenerator.