asyncio.run_coroutine_threadsafe 的未来永远挂起?

cf *_*ica 5 python python-3.x python-asyncio

作为我上一个关于从同步函数调用异步函数的问题的后续,我发现了asyncio.run_coroutine_threadsafe

从纸面上看,这看起来很理想。根据StackOverflow 问题中的评论,这看起来很理想。我可以创建一个新线程,获取对原始事件循环的引用,并安排异步函数在原始事件循环内运行,同时仅阻塞新线程。

class _AsyncBridge:
    def call_async_method(self, function, *args, **kwargs):
        print(f"call_async_method {threading.get_ident()}")
        event_loop = asyncio.get_event_loop()
        thread_pool = ThreadPoolExecutor()
        return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()

    async def _async_wrapper(self, event_loop, function, *args, **kwargs):
        print(f"async_wrapper {threading.get_ident()}")
        future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
        return future.result()
Run Code Online (Sandbox Code Playgroud)

这不会出错,但也不会返回。期货只是挂起,异步调用永远不会被命中。call_async_method我是否在、_async_wrapper或两者中使用 Future 似乎并不重要;无论我在哪里使用 Future,它都会挂起。

我尝试将调用run_coroutine_threadsafe直接放入主事件循环中:

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
Run Code Online (Sandbox Code Playgroud)

未来也悬在这里。

我尝试使用此处LoopExecutor定义的类,这似乎完全满足了我的需求。

event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
Run Code Online (Sandbox Code Playgroud)

返回的 Future 也挂在那里。

我考虑过我阻塞了原来的事件循环,因此计划的任务永远不会运行,所以我创建了一个新的事件循环:

event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value
Run Code Online (Sandbox Code Playgroud)

仍然悬而未决future.result(),我不明白为什么。

asyncio.run_coroutine_threadsafe我使用它的方式有什么问题吗?

Sra*_*raw 5

我认为有两个问题。第一个是run_coroutine_threadsafe只提交协程但不真正运行它。

所以

event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
Run Code Online (Sandbox Code Playgroud)

不起作用,因为你从未运行过这个循环。

为了让它工作,理论上你可以使用asyncio.run(future),但实际上你不能,也许是因为它是由 提交的run_coroutine_threadsafe。以下将起作用:

import asyncio

async def stop():
    await asyncio.sleep(3)

event_loop = asyncio.get_event_loop()
coro = asyncio.sleep(1, result=3)
future = asyncio.run_coroutine_threadsafe(coro, event_loop)
event_loop.run_until_complete(stop())
print(future.result())
Run Code Online (Sandbox Code Playgroud)

第二个问题是,我想你已经注意到你的结构在某种程度上被颠倒了。您应该在单独的线程中运行事件循环,但从主线程提交任务。如果你在单独的线程中提交它,你仍然需要在主线程中运行事件循环来实际执行它。大多数情况下,我建议在单独的线程中创建另一个事件循环。

  • 因此,如果我使用新的事件循环创建一个新线程,将该新的事件循环绑定到异步库,然后*从我的主线程的事件循环*调用异步库的方法,它应该可以工作吗?还是我误解了?我仍然不太明白在哪里可以调用 `run_until_complete` 因为我认为我需要它与 `run_coroutine_threadsafe` 得到相同的循环,并且由于异步库,该事件循环已经在永久的 `run_until_complete` 中。 (2认同)