cf *_*ica 5 python python-3.x python-asyncio
作为我上一个关于从同步函数调用异步函数的问题的后续,我发现了asyncio.run_coroutine_threadsafe。
从纸面上看,这看起来很理想。根据StackOverflow 问题中的评论,这看起来很理想。我可以创建一个新线程,获取对原始事件循环的引用,并安排异步函数在原始事件循环内运行,同时仅阻塞新线程。
class _AsyncBridge:
def call_async_method(self, function, *args, **kwargs):
print(f"call_async_method {threading.get_ident()}")
event_loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor()
return thread_pool.submit(asyncio.run, self._async_wrapper(event_loop, function, *args, **kwargs)).result()
async def _async_wrapper(self, event_loop, function, *args, **kwargs):
print(f"async_wrapper {threading.get_ident()}")
future = asyncio.run_coroutine_threadsafe(function(*args, **kwargs), event_loop)
return future.result()
Run Code Online (Sandbox Code Playgroud)
这不会出错,但也不会返回。期货只是挂起,异步调用永远不会被命中。call_async_method我是否在、_async_wrapper或两者中使用 Future 似乎并不重要;无论我在哪里使用 Future,它都会挂起。
我尝试将调用run_coroutine_threadsafe直接放入主事件循环中:
event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
Run Code Online (Sandbox Code Playgroud)
未来也悬在这里。
我尝试使用此处LoopExecutor定义的类,这似乎完全满足了我的需求。
event_loop = asyncio.get_event_loop()
loop_executor = LoopExecutor(event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
Run Code Online (Sandbox Code Playgroud)
返回的 Future 也挂在那里。
我考虑过我阻塞了原来的事件循环,因此计划的任务永远不会运行,所以我创建了一个新的事件循环:
event_loop = asyncio.get_event_loop()
new_event_loop = asyncio.new_event_loop()
print(event_loop == new_event_loop) # sanity check to make sure the new loop is actually different from the existing one - prints False as expected
loop_executor = LoopExecutor(new_event_loop)
future = loop_executor.submit(cls._do_work_async, arg1=arg1, arg2=arg2, arg3=arg3)
return_value = future.result()
return return_value
Run Code Online (Sandbox Code Playgroud)
仍然悬而未决future.result(),我不明白为什么。
asyncio.run_coroutine_threadsafe我使用它的方式有什么问题吗?
我认为有两个问题。第一个是run_coroutine_threadsafe只提交协程但不真正运行它。
所以
event_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(cls._do_work_async(arg1, arg2, arg3), event_loop)
return_value = future.result()
Run Code Online (Sandbox Code Playgroud)
不起作用,因为你从未运行过这个循环。
为了让它工作,理论上你可以使用asyncio.run(future),但实际上你不能,也许是因为它是由 提交的run_coroutine_threadsafe。以下将起作用:
import asyncio
async def stop():
await asyncio.sleep(3)
event_loop = asyncio.get_event_loop()
coro = asyncio.sleep(1, result=3)
future = asyncio.run_coroutine_threadsafe(coro, event_loop)
event_loop.run_until_complete(stop())
print(future.result())
Run Code Online (Sandbox Code Playgroud)
第二个问题是,我想你已经注意到你的结构在某种程度上被颠倒了。您应该在单独的线程中运行事件循环,但从主线程提交任务。如果你在单独的线程中提交它,你仍然需要在主线程中运行事件循环来实际执行它。大多数情况下,我建议在单独的线程中创建另一个事件循环。
| 归档时间: |
|
| 查看次数: |
5244 次 |
| 最近记录: |