lns*_*shi 2 python coroutine python-3.x python-asyncio
我试图理解 python asyncio 的call_soon_threadsafe API,但失败了,使用下面的示例代码,如果我的simple协程想要返回某些内容,我应该如何从调用方获取返回值?
import time
import asyncio as aio
import uvloop
from threading import Thread
aio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def simple(a, fut:aio.Future):
await aio.sleep(a)
return fut.set_result(a)
def delegator(loop):
aio.set_event_loop(loop)
loop.run_forever()
loop_exec = aio.new_event_loop()
t = Thread(target=delegator, args=(loop_exec,))
t.start()
if __name__ == '__main__':
start_time = time.time()
fut = loop_exec.create_future() # tried to get back returned value by future
handle = loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, fut))
res = aio.wait_for(fut, 10)
print('Time consumed: {}s'.format(time.time() - start_time))
print('>>>>>>>>>>', res)
# Output
Time consumed: 3.2901763916015625e-05s
>>>>>>>>>> <generator object wait_for at 0x110bb9b48>
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,我试图通过将 future 传递给在不同线程中运行的协程来获取返回值,但仍然不知道如何正确获取它。
基本上有两个问题:
call_soon_threadsafe,只是感觉run_coroutine_threadsafe用起来比较方便,而且能够覆盖几乎所有我能想象到的这种不同线程协程交互的情况。通过上面的示例代码,我如何从调用方取回返回值?
由于事件循环在主线程之外运行,因此您需要使用线程感知的同步设备。例如:
async def simple(a, event):
await asyncio.sleep(a)
event.simple_result = a
event.set()
done = threading.Event()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
done.wait(10)
res = done.simple_result
Run Code Online (Sandbox Code Playgroud)
或者,您可以使用 a 进行同步concurrent.futures.Future,这就像带有对象负载的一次性事件。(请注意,您不能使用 asyncio future,因为它不是线程安全的。)
async def simple(a, fut):
await asyncio.sleep(a)
fut.set_result(a)
done = concurrent.futures.Future()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
res = done.result(10)
Run Code Online (Sandbox Code Playgroud)
concurrent.futures.Future尽管像上面的示例一样创建“裸”是可行的,但文档不鼓励这样做。然而,正如文森特在评论中指出的那样,这run_coroutine_threadsafe对你来说是这样的:
async def simple(a):
await asyncio.sleep(a)
return a
fut = asyncio.run_coroutine_threadsafe(simple(3))
res = fut.result(10)
Run Code Online (Sandbox Code Playgroud)
这的实际用例是什么
call_soon_threadsafe
最简单的答案是,这call_soon_threadsafe是一个较低级别的 API,当您只想告诉事件循环执行或开始执行某些操作时,可以使用它。call_soon_threadsafe是用于实现诸如run_coroutine_threadsafe以及许多其他功能的构建块。至于为什么你想自己使用管道功能......
有时您想要执行普通函数,而不是协程。有时你的函数是“即发即弃”的,你并不关心它的返回值。(或者也许该函数最终会通过某个侧面通道通知您其完成。)在这些情况下,这call_soon_threadsafe是完成该作业的正确工具,因为它更轻量级,因为它不会尝试创建附加concurrent.futures.Future并将其附加到执行的代码。例子:
loop.call_soon_threadsafe(loop.stop)告诉事件循环停止运行loop.call_soon_threadsafe(queue.put_nowait, some_item)将某些内容添加到无界异步队列中loop.call_soon_threadsafe(asyncio.create_task, coroutinefn())将协程提交到事件循环而不等待其完成loop.call_soon_threadsafe(some_future.set_result, value)从不同的线程设置 asyncio future 的结果| 归档时间: |
|
| 查看次数: |
5356 次 |
| 最近记录: |