Python扭曲:迭代器和yield/inlineCallbacks

hel*_*arn 18 python twisted

伙计们,我彻底搞糊涂了,所以我甚至不能正确地问问题,但是这里有:

我有一个使用inlineCallbacks的扭曲应用程序.现在我需要定义一个迭代器,这意味着生成器返回给调用者.但是,迭代器不能用inlineCallbacks装饰,可以吗?如果没有,那我该怎么做我这样的代码.

只是为了澄清:目标是process_loop需要每次调用,比方说5秒,它只能处理一大块,比方说10,然后它必须放手.但是,要知道10块(存储在缓存中,这是dict的dict),它需要调用一个返回延迟的函数.

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'
Run Code Online (Sandbox Code Playgroud)

Jea*_*one 12

你是不对的,你不能表达你想要表达的内容cacheiter.该inlineCallbacks装饰不会让你有一个返回一个迭代器的功能.如果用它装饰一个函数,那么结果就是一个总是返回一个函数的函数Deferred.这就是它的用途.

造成这种困难的部分原因是迭代器不能很好地处理异步代码.如果有一个Deferred涉及生成迭代器的元素,那么从迭代器中出来的元素将首先是Deferreds.

你可以做这样的事情来解释:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)
Run Code Online (Sandbox Code Playgroud)

这可以工作,但看起来特别奇怪.由于生成器只能屈服于它们的调用者(例如,不是它们的调用者的调用者),因此some_jobs迭代器无法对此做任何事情; 只有词法内的代码process_work可以产生一个延迟到提供的inlineCallbacks蹦床等待.

如果您不介意这种模式,那么我们可以将您的代码映像为:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)
Run Code Online (Sandbox Code Playgroud)

但是,您可以采用的另一种方法是使用twisted.internet.task.cooperate. cooperate采用迭代器并消耗它,假设消耗它可能是昂贵的,并且在多个反应器迭代中分离工作.cacheiter从上面的定义:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()
Run Code Online (Sandbox Code Playgroud)


zee*_*kay -1

尝试将迭代器编写为DeferredGenerator.

  • deferredGenerator 只是 inlineCallbacks 的旧版本;OP基本上已经在这样做了。 (3认同)