Raf*_*ros 4 python google-app-engine asynchronous app-engine-ndb
我有一个带有3个参数的tasklet,一个带有id和title的字典,一个用户密钥和一个已经存在于数据库中的记录列表.在for循环中调用函数defer_fetch N次.
@ndb.tasklet
def defer_fetch(data, user_key, already_inserted):
if data['_id'] not in already_inserted:
document_key = yield Document.fetch_or_create(data)
yield defer_fetch_document(user_key, document_key)
else:
document_key = alread_inserted[data['_id']]
raise ndb.Return(document_key)
@ndb.tasklet
def defer_fetch_document(user_key, document_key):
deferred.defer(process_document, user_key, document_key, _queue="process-documents")
raise ndb.Return(True)
Run Code Online (Sandbox Code Playgroud)
document.fetch_or_create的代码在defer_fetch的所有调用之间并行执行,但调用fetch_document不是,如附件所示

如何使defer_fetch_document也并行运行?
据我所知,deferred与ndb.tasklets不能很好地兼容(deferred.defer没有收益,所以代码只是同步运行).相反,您应该直接使用任务队列的add_async功能.
@ndb.tasklet
def defer_fetch_document(user_key, document_key):
queue = taskqueue.Queue("process-documents")
task = taskqueue.Task(url="<url for worker>",
params={"document_key": document_key.urlsafe()})
yield queue.add_async(task) #this returns a rpc which you can yield on
raise ndb.Return(True)
Run Code Online (Sandbox Code Playgroud)
或者,生成一个文档键列表(从中生成任务列表),然后使用任务列表调用add_async.
| 归档时间: |
|
| 查看次数: |
916 次 |
| 最近记录: |