Python asyncio 从“call_soon_threadsafe”收集返回值

lns*_*shi 2 python coroutine python-3.x python-asyncio

我试图理解 python asyncio 的call_soon_threadsafe API,但失败了,使用下面的示例代码,如果我的simple协程想要返回某些内容,我应该如何从调用方获取返回值?

import time
import asyncio as aio
import uvloop

from threading import Thread

aio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def simple(a, fut:aio.Future):
  await aio.sleep(a)
  return fut.set_result(a)

def delegator(loop):
  aio.set_event_loop(loop)
  loop.run_forever()

loop_exec = aio.new_event_loop()

t = Thread(target=delegator, args=(loop_exec,))
t.start()


if __name__ == '__main__':
  start_time = time.time()

  fut = loop_exec.create_future() # tried to get back returned value by future
  handle = loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, fut))
  res = aio.wait_for(fut, 10)

  print('Time consumed: {}s'.format(time.time() - start_time))
  print('>>>>>>>>>>', res)

# Output
Time consumed: 3.2901763916015625e-05s
>>>>>>>>>> <generator object wait_for at 0x110bb9b48>
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,我试图通过将 future 传递给在不同线程中运行的协程来获取返回值,但仍然不知道如何正确获取它。

基本上有两个问题:

  1. 通过上面的示例代码,我如何从调用方取回返回值?
  2. 这个的实际用例是什么call_soon_threadsafe,只是感觉run_coroutine_threadsafe用起来比较方便,而且能够覆盖几乎所有我能想象到的这种不同线程协程交互的情况。

use*_*342 6

通过上面的示例代码,我如何从调用方取回返回值?

由于事件循环在主线程之外运行,因此您需要使用线程感知的同步设备。例如:

async def simple(a, event):
    await asyncio.sleep(a)
    event.simple_result = a
    event.set()

done = threading.Event()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
done.wait(10)
res = done.simple_result
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用 a 进行同步concurrent.futures.Future,这就像带有对象负载的一次性事件。(请注意,您不能使用 asyncio future,因为它不是线程安全的。)

async def simple(a, fut):
    await asyncio.sleep(a)
    fut.set_result(a)

done = concurrent.futures.Future()
loop_exec.call_soon_threadsafe(aio.ensure_future, simple(3, done))
res = done.result(10)
Run Code Online (Sandbox Code Playgroud)

concurrent.futures.Future尽管像上面的示例一样创建“裸”是可行的,但文档不鼓励这样做。然而,正如文森特在评论中指出的那样,这run_coroutine_threadsafe对你来说是这样的:

async def simple(a):
    await asyncio.sleep(a)
    return a

fut = asyncio.run_coroutine_threadsafe(simple(3))
res = fut.result(10)
Run Code Online (Sandbox Code Playgroud)

这的实际用例是什么call_soon_threadsafe

最简单的答案是,这call_soon_threadsafe是一个较低级别的 API,当您只想告诉事件循环执行或开始执行某些操作时,可以使用它。call_soon_threadsafe是用于实现诸如run_coroutine_threadsafe以及许多其他功能的构建块。至于为什么你想自己使用管道功能.​​.....

有时您想要执行普通函数,而不是协程。有时你的函数是“即发即弃”的,你并不关心它的返回值。(或者也许该函数最终会通过某个侧面通道通知您其完成。)在这些情况下,这call_soon_threadsafe是完成该作业的正确工具,因为它更轻量级,因为它不会尝试创建附加concurrent.futures.Future并将其附加到执行的代码。例子:

  • loop.call_soon_threadsafe(loop.stop)告诉事件循环停止运行
  • loop.call_soon_threadsafe(queue.put_nowait, some_item)将某些内容添加到无界异步队列中
  • loop.call_soon_threadsafe(asyncio.create_task, coroutinefn())将协程提交到事件循环而不等待其完成
  • loop.call_soon_threadsafe(some_future.set_result, value)从不同的线程设置 asyncio future 的结果
  • 这个答案中的低级代码