同时观察子进程的stdout和stderr

Ale*_*eph 8 python subprocess python-asyncio

如何同时查看长时间运行的子进程的标准输出和标准错误,在子进程生成后立即处理每一行?

我不介意使用Python3.6的异步工具来制作我希望在两个流中的每个流上的非阻塞异步循环,但这似乎无法解决问题.以下代码:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()
Run Code Online (Sandbox Code Playgroud)

输出:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1
Run Code Online (Sandbox Code Playgroud)

虽然我希望它输出如下:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2
Run Code Online (Sandbox Code Playgroud)

use*_*342 8

要做到这一点,你需要一个函数,它将采用两个异步序列并合并它们,从而产生其中一个或另一个的结果,因为它们变得可用.有了库存这样的功能,run可能看起来像这样:

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in merge(p.stdout, p.stderr):
        print(datetime.now(), f.decode().strip())
Run Code Online (Sandbox Code Playgroud)

像函数merge不(还)存在在标准库,但aiostream外部库提供一个.您也可以使用异步生成器编写自己的asyncio.wait():

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret
Run Code Online (Sandbox Code Playgroud)

上面的run一个细节仍然与您想要的输出不同:它不会区分输出和错误行.但是,通过使用指标装饰线条可以很容易地实现这一点:

async def decorate_with(it, prefix):
    async for item in it:
        yield prefix, item

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for is_out, line in merge(decorate_with(p.stdout, True),
                                    decorate_with(p.stderr, False)):
        if is_out:
            print(datetime.now(), line.decode().strip())
        else:
            print(datetime.now(), "E:", line.decode().strip())
Run Code Online (Sandbox Code Playgroud)


use*_*342 5

我突然想到这个问题实际上有一个更简单的解决方案,至少如果监视代码不需要在单个协程调用中。

您可以做的是生成两个单独的协程,一个用于 stdout,一个用于 stderr。并行运行它们将为您提供所需的语义,您可以使用gather它们来等待它们的完成:

def watch(stream, prefix=''):
    async for line in stream:
        print(datetime.now(), prefix, line.decode().strip())

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    await asyncio.gather(watch(p.stdout), watch(p.stderr, 'E:'))
Run Code Online (Sandbox Code Playgroud)