如何在 Python 中包装自定义 future 以便与 asyncio 一起使用?

kir*_*kov 4 python python-3.x python-asyncio concurrent.futures

有很多库使用其自定义版本的Futurekafkas3transfer只是两个例子:它们所有自定义的类似未来的类都具有object超类。

毫不奇怪,您不能直接调用asyncio.wrap_future()此类对象,也不能await与它们一起使用。

包装此类 future 以便与 asyncio 一起使用的正确方法是什么?

use*_*342 6

如果 future 类支持标准 future 功能,例如完成回调和result方法,只需使用如下内容:

def wrap_future(f):
    loop = asyncio.get_event_loop()
    aio_future = loop.create_future()
    def on_done(*_):
        try:
            result = f.result()
        except Exception as e:
            loop.call_soon_threadsafe(aio_future.set_exception, e)
        else:
            loop.call_soon_threadsafe(aio_future.set_result, result)
    f.add_done_callback(on_done)
    return aio_future
Run Code Online (Sandbox Code Playgroud)

将该代码视为一个模板,您可以自定义该模板以匹配您正在处理的未来的具体情况。

预期用法是从运行 asyncio 事件循环的线程中调用它:

value = await wrap_future(some_foreign_future)
Run Code Online (Sandbox Code Playgroud)

如果从不同的线程调用它,请确保loop显式传递,因为asyncio.get_event_loop从未注册 asyncio 的线程调用时会失败。