异步推迟谷歌应用引擎中的多个任务

Raf*_*ros 4 python google-app-engine asynchronous app-engine-ndb

我有一个带有3个参数的tasklet,一个带有id和title的字典,一个用户密钥和一个已经存在于数据库中的记录列表.在for循环中调用函数defer_fetch N次.

@ndb.tasklet
def defer_fetch(data, user_key, already_inserted):
    if data['_id'] not in already_inserted:
         document_key = yield Document.fetch_or_create(data)
         yield defer_fetch_document(user_key, document_key)
    else:
         document_key = alread_inserted[data['_id']]
    raise ndb.Return(document_key)

@ndb.tasklet
def defer_fetch_document(user_key, document_key):
    deferred.defer(process_document, user_key, document_key, _queue="process-documents")
    raise ndb.Return(True)
Run Code Online (Sandbox Code Playgroud)

document.fetch_or_create的代码在defer_fetch的所有调用之间并行执行,但调用fetch_document不是,如附件所示

电话瀑布

如何使defer_fetch_document也并行运行?

Rob*_*tis 5

据我所知,deferred与ndb.tasklets不能很好地兼容(deferred.defer没有收益,所以代码只是同步运行).相反,您应该直接使用任务队列的add_async功能.

@ndb.tasklet
def defer_fetch_document(user_key, document_key):
    queue = taskqueue.Queue("process-documents")
    task = taskqueue.Task(url="<url for worker>", 
                          params={"document_key": document_key.urlsafe()})
    yield queue.add_async(task)  #this returns a rpc which you can yield on
    raise ndb.Return(True)
Run Code Online (Sandbox Code Playgroud)

或者,生成一个文档键列表(从中生成任务列表),然后使用任务列表调用add_async.