如何防止ndb批处理put_async()并使其立即发出RPC?

Pas*_*que 5 google-app-engine app-engine-ndb

我有一个请求处理程序,它更新实体,将其保存到数据存储区,然后需要在返回之前执行一些额外的工作(比如排队后台任务和json序列化一些结果).我希望并行化此代码,以便在保存实体时完成其他工作.

这是我的处理程序代码归结为:

class FooHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def post(self):
        foo = yield Foo.get_by_id_async(some_id)

        # Do some work with foo

        # Don't yield, as I want to perform the code that follows
        # while foo is being saved to the datastore.
        # I'm in a toplevel, so the handler will not exit as long as
        # this async request is not finished.
        foo.put_async()

        taskqueue.add(...)
        json_result = generate_result()
        self.response.headers["Content-Type"] = "application/json; charset=UTF-8"
        self.response.write(json_result)
Run Code Online (Sandbox Code Playgroud)

但是,Appstats显示datastore.PutRPC在以下情况下连续完成taskqueue.Add:

Appstats截图

稍微挖掘一下就ndb.context.py表明put_async()呼叫最终被添加到一个AutoBatcher而不是立即发出的RPC.

所以我假设_put_batchertoplevel等待所有异步调用完成时,最终会被刷新.

我知道批处理put在某些情况下有实际的好处,但在我的情况下,我真的希望立即发送put RPC,所以我可以在保存实体时执行其他工作.

如果我这样做yield foo.put_async(),那么我在Appstats中获得了相同的瀑布,但是datastore.Put在完成之前完成了:

第二个Appstats截图

这是可以预料的,因为yield我的处理程序put_async()在执行其余代码之前等待调用完成.

我也试过ndb.get_context().flush()在之后添加一个调用foo.put_async(),但根据Appstats datastore.Put,taskqueue.BulkAdd调用仍然没有并行调用.

所以我的问题是:如何强制调用put_async()绕过自动批处理并立即发出RPC?

Gui*_*sum 6

没有支持的方法来做到这一点.也许应该有.你能试试这个吗?

loop - ndb.eventloop.get_event_loop()
while loop.run_idle():
    pass
Run Code Online (Sandbox Code Playgroud)

您可能必须查看ndb/eventloop.py的源代码以查看您还可以尝试的其他内容 - 基本上您想要尝试run0()所做的大部分工作,除非等待RPC.特别是,你可能必须这样做:

while loop.current:
    loop.run0()
while loop.run_idle():
    pass
Run Code Online (Sandbox Code Playgroud)

(这仍然不受支持,因为您可能还需要处理其他条件,但在您的示例中似乎没有这些条件.)