python中子进程读取线上的超时

Tom*_*Tom 39 python subprocess timeout

我有一个小问题,我不太确定如何解决.这是一个最小的例子:

是)我有的

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    some_criterium = do_something(line)
Run Code Online (Sandbox Code Playgroud)

我想要什么

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    if nothing_happens_after_10s:
        break
    else:
        some_criterium = do_something(line)
Run Code Online (Sandbox Code Playgroud)

我从子进程读取一行并用它做一些事情.我想要的是在固定时间间隔后没有线路到达时退出.有什么建议?

Tom*_*Tom 21

感谢所有的答案!我找到了一种方法来解决我的问题,只需使用select.poll来查看stdout.

import select
...
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
poll_obj = select.poll()
poll_obj.register(scan_process.stdout, select.POLLIN)   
while(some_criterium and not time_limit):
    poll_result = poll_obj.poll(0)
    if poll_result:
        line = scan_process.stdout.readline()
        some_criterium = do_something(line)
    update(time_limit)
Run Code Online (Sandbox Code Playgroud)

  • 虽然这似乎有效,但它并不健壮 - 考虑一下你的子进程是否输出了没有换行的东西.`select` /`poll`将触发,但`readline`将无限期阻塞. (9认同)
  • 可能无法在Windows上运行,其中`select.poll()`仅适用于套接字.https://docs.python.org/2/library/select.html (5认同)

jfs*_*jfs 17

这是一个便携式解决方案,使用以下方法强制执行读取单行的超时asyncio:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

async def run_command(*args, timeout=None):
    # start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec(*args,
            stdout=PIPE, stderr=STDOUT)

    # read line (sequence of bytes ending with b'\n') asynchronously
    while True:
        try:
            line = await asyncio.wait_for(process.stdout.readline(), timeout)
        except asyncio.TimeoutError:
            pass
        else:
            if not line: # EOF
                break
            elif do_something(line): 
                continue # while some criterium is satisfied
        process.kill() # timeout or some criterium is not satisfied
        break
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

returncode = loop.run_until_complete(run_command("cmd", "arg 1", "arg 2",
                                                 timeout=10))
loop.close()
Run Code Online (Sandbox Code Playgroud)

  • @jftuga:`.read()` 在这里是不正确的。问题是关于`.readline()`。如果您需要所有输出,那么使用带有超时的`.communicate()` 会更简单。阅读[我在使用`.communicate()`的答案下的评论](http://stackoverflow.com/questions/10756383/timeout-on-subprocess-readline-in-python/34114767?noredirect=1#comment55978734_10756738)。 (2认同)

Flo*_*ogo 10

我在python中使用了一些更通用的东西(IIRC也从SO问题拼凑而成,但我记不起哪些).

import thread
from threading import Timer

def run_with_timeout(timeout, default, f, *args, **kwargs):
    if not timeout:
        return f(*args, **kwargs)
    try:
        timeout_timer = Timer(timeout, thread.interrupt_main)
        timeout_timer.start()
        result = f(*args, **kwargs)
        return result
    except KeyboardInterrupt:
        return default
    finally:
        timeout_timer.cancel()
Run Code Online (Sandbox Code Playgroud)

但是要注意,这会使用中断来阻止你给它的任何功能.对于所有函数来说这可能不是一个好主意,它也会阻止你在超时期间用ctrl + c关闭程序(即ctrl + c将作为超时处理)你可以使用这个调用它:

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = run_with_timeout(timeout, None, scan_process.stdout.readline)
    if line is None:
        break
    else:
        some_criterium = do_something(line)
Run Code Online (Sandbox Code Playgroud)

不过可能有点矫枉过正了.我怀疑你的案子有一个更简单的选择,我不知道.


jco*_*ctx 6

虽然Tom 的解决方案有效,但select()C成语中使用更紧凑,这相当于您的答案:

from select import select
scan_process = subprocess.Popen(command,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT,
                                bufsize=1)  # Line buffered
while some_criterium and not time_limit:
    poll_result = select([scan_process.stdout], [], [], time_limit)[0]
Run Code Online (Sandbox Code Playgroud)

其余的都是一样的。

pydoc select.select

[注意:这是特定于 Unix 的,其他一些答案也是如此。]

[注 2:编辑以根据 OP 请求添加行缓冲]

[注3:行缓冲并非在所有情况下都可靠,导致readline()阻塞]

  • 您不知道子进程的 stdout 是否是行缓冲的(`bufsize=1` 对子进程没有影响;它仅调节父进程中用于读取输出的缓冲区)并且通常标准输出是块缓冲的,如果它被重定向到管道,即,`select()` 可能会在没有完整行的情况下返回。 (3认同)
  • 另外,为了避免像@Tom的答案一样阻塞`.readline()`,请在`select`之后使用`os.read(scan_process.stdout.fileno(), 512)`(如果其他东西可以访问管道),但与“.readline()”相比,在“select”之后阻塞的可能性更小。 (2认同)

jfs*_*jfs 5

一个可移植的解决方案是,如果读取一行的时间过长,则使用线程杀死子进程:

#!/usr/bin/env python3
from subprocess import Popen, PIPE, STDOUT

timeout = 10
with Popen(command, stdout=PIPE, stderr=STDOUT,
           universal_newlines=True) as process:  # text mode
    # kill process in timeout seconds unless the timer is restarted
    watchdog = WatchdogTimer(timeout, callback=process.kill, daemon=True)
    watchdog.start()
    for line in process.stdout:
        # don't invoke the watcthdog callback if do_something() takes too long
        with watchdog.blocked:
            if not do_something(line):  # some criterium is not satisfied
                process.kill()
                break
            watchdog.restart()  # restart timer just before reading the next line
    watchdog.cancel()
Run Code Online (Sandbox Code Playgroud)

其中WatchdogTimer类就像threading.Timer时可以重新启动和/或封端:

from threading import Event, Lock, Thread
from subprocess import Popen, PIPE, STDOUT
from time import monotonic  # use time.time or monotonic.monotonic on Python 2

class WatchdogTimer(Thread):
    """Run *callback* in *timeout* seconds unless the timer is restarted."""

    def __init__(self, timeout, callback, *args, timer=monotonic, **kwargs):
        super().__init__(**kwargs)
        self.timeout = timeout
        self.callback = callback
        self.args = args
        self.timer = timer
        self.cancelled = Event()
        self.blocked = Lock()

    def run(self):
        self.restart() # don't start timer until `.start()` is called
        # wait until timeout happens or the timer is canceled
        while not self.cancelled.wait(self.deadline - self.timer()):
            # don't test the timeout while something else holds the lock
            # allow the timer to be restarted while blocked
            with self.blocked:
                if self.deadline <= self.timer() and not self.cancelled.is_set():
                    return self.callback(*self.args)  # on timeout

    def restart(self):
        """Restart the watchdog timer."""
        self.deadline = self.timer() + self.timeout

    def cancel(self):
        self.cancelled.set()
Run Code Online (Sandbox Code Playgroud)