Python asyncio:在工作线程上运行subprocess_exec

Sil*_*ler 6 python linux asynchronous python-3.x python-asyncio

所以我使用Python asyncio模块(在Linux上)启动子进程,然后异步监视它.我的代码工作正常...在主线程上运行时.但是当我在工作线程上运行它时,它会挂起,并且process_exited永远不会调用回调.

我怀疑这可能实际上是某种未记录的缺陷或subprocess_exec在工作线程上运行的问题,可能与实现如何处理后台线程中的信号有关.但它也可能只是让我搞砸了.

一个简单,可重复的例子如下:

class MyProtocol(asyncio.SubprocessProtocol):
    def __init__(self, done_future):
        super().__init__()
        self._done_future = done_future

    def pipe_data_received(self, fd, data):
        print("Received:", len(data))

    def process_exited(self):
        print("PROCESS EXITED!")
        self._done_future.set_result(None)

def run(loop):
    done_future = asyncio.Future(loop = loop)
    transport = None
    try:
        transport, protocol = yield from loop.subprocess_exec(
            lambda : MyProtocol(done_future),
            "ls",
            "-lh",
            stdin = None
        )
        yield from done_future
    finally:
        if transport: transport.close()

    return done_future.result()

def run_loop():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) # bind event loop to current thread

    try:
        return loop.run_until_complete(run(loop))
    finally:
        loop.close()
Run Code Online (Sandbox Code Playgroud)

所以在这里,我设置了一个asyncio事件循环来执行shell命令ls -lh,然后触发从子进程接收数据时的回调,以及子进程退出时的另一个回调.

如果我只是run_loop()直接在Python程序的主线程中调用,一切都很顺利.但如果我说:

t = threading.Thread(target = run_loop)
t.start()
t.join()
Run Code Online (Sandbox Code Playgroud)

然后会发生的是pipe_data_received()回调被成功调用,但从process_exited()未被调用过,程序就会挂起.

在谷歌搜索并查看asyncio实现的源代码后unix_events.py,我发现可能需要手动将我的事件循环附加到全局"子观察器"对象,如下所示:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) # bind event loop to current thread
asyncio.get_child_watcher().attach_loop(loop)
Run Code Online (Sandbox Code Playgroud)

显然,儿童观察者是一个(未记录的)对象,负责waitpid在引擎盖下打电话(或类似的东西).但当我尝试这个,并run_event_loop()在后台线程中运行时,我得到了错误:

  File "/usr/lib/python3.4/asyncio/unix_events.py", line 77, in add_signal_handler
    raise RuntimeError(str(exc))
RuntimeError: set_wakeup_fd only works in main thread
Run Code Online (Sandbox Code Playgroud)

所以这看起来实现实际上做了检查以确保信号处理程序只能在主线程上使用,这让我相信在当前实现中,subprocess_exec在后台线程上使用实际上是根本不可能的,如果不改变Python源代码本身.

我对此是否正确?可悲的是,该asyncio模块的记录很少,所以我很难对这里的结论充满信心.我可能只是做错了什么.

Vin*_*ent 5

只要在主线程中运行asyncio循环且其子监视程序实例化,就可以在辅助线程中处理子进程:

asyncio.get_child_watcher()
loop = asyncio.get_event_loop()
coro = loop.run_in_executor(None, run_loop)
loop.run_until_complete(coro)
Run Code Online (Sandbox Code Playgroud)

请参阅这篇文章文档