发送asyncio任务以在其他线程中循环运行

MRo*_*lin 18 python python-asyncio

我怎样才能异步插入任务来运行 asyncio在另一个线程中事件循环中运行?

我的动机是在解释器中支持交互式异步工作负载.我无法阻止主REPL线程.

我目前有缺陷的理解说,以下应该有效.为什么不呢?什么是实现上述目标的更好方法?

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

asyncio.async(g(), loop=loop)
Run Code Online (Sandbox Code Playgroud)

Jas*_*ohi 8

您必须使用call_soon_threadsafe来安排来自不同线程的回调:

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

loop.call_soon_threadsafe(asyncio.async, g())
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请参阅https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading.

编辑:支持异步工作负载的解释器示例

# vim: filetype=python3 tabstop=2 expandtab

import asyncio as aio
import random

@aio.coroutine
def async_eval(input_, sec):
  yield from aio.sleep(sec)
  print("")
  try:
    result = eval(input_)
  except Exception as e:
    print("< {!r} does not compute >".format(input_))
  else:  
    print("< {!r} = {} >".format(input_, result))

@aio.coroutine
def main(loop):
  while True:
    input_ = yield from loop.run_in_executor(None, input, "> ")

    if input_ == "quit":
      break
    elif input_ == "":
      continue
    else:
      sec = random.uniform(5, 10)
      print("< {!r} scheduled for execution in {:.02} sec>".format(input_, sec))
      aio.async(async_eval(input_, sec))

loop = aio.get_event_loop()

loop.run_until_complete(main(loop))
loop.close()
Run Code Online (Sandbox Code Playgroud)

  • 如果您需要在另一个线程中的事件循环上运行协程,请使用:```asyncio.run_coroutine_threadsafe(my_coro(param1), loop)``` (6认同)
  • 3.7 的有效语法是什么-我也遇到了这个 (2认同)

Jim*_*mes 5

Jashandeep Sohi 回答中的第一个示例在 3.7+ 中对我不起作用,并打印有关已弃用注释的警告。我把它改造成一个在 3.8 下运行的东西。我也稍微调整了一下以满足我的需求。我是 Python 中的多线程的新手(但一般没有多线程),因此感谢任何建议、指导等:

import asyncio
from threading import Thread


loop = asyncio.new_event_loop()
running = True


def evaluate(future):
    global running
    stop = future.result()
    if stop:
        print("press enter to exit...")
        running = False


def side_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


thread = Thread(target=side_thread, args=(loop,), daemon=True)
thread.start()


async def display(text):
    await asyncio.sleep(5)
    print("echo:", text)
    return text == "exit"


while running:
  text = input("enter text: ")
  future = asyncio.run_coroutine_threadsafe(display(text), loop)
  future.add_done_callback(evaluate)


print("exiting")
Run Code Online (Sandbox Code Playgroud)

echo 和其他输出将与提示冲突,但它应该足以证明它工作正常。

我不确定的一件事是running从一个线程设置全局并从另一个线程读取它。我想也许 GIL 会同步线程缓存,但我很想得到确认(或不确认)。