python中subprocess.PIPE上的非阻塞读取

Mat*_*agé 477 python io subprocess nonblocking

我正在使用子进程模块启动子进程并连接到它的输出流(stdout).我希望能够在其标准输出上执行非阻塞读取.有没有办法让.readline非阻塞或在我调用之前检查流上是否有数据.readline?我希望这是可移植的,或至少在Windows和Linux下工作.

这是我现在如何做到的(.readline如果没有数据可用,则阻止它):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Run Code Online (Sandbox Code Playgroud)

jfs*_*jfs 393

fcntl,select,asyncproc不会在这种情况下帮助.

无论操作系统如何,无阻塞地读取流的可靠方法是使用Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
Run Code Online (Sandbox Code Playgroud)

  • 是的,这对我有用,我删除了很多.它包括良好实践,但并非总是必要的.Python 3.x 2.X compat和close_fds可能会被省略,它仍然有效.但只要知道一切都做了什么,不要盲目地复制它,即使它只是有效!(实际上最简单的解决方案是使用一个线程并像Seb那样做一个readline,Qeues只是获取数据的简单方法,还有其他的,线程就是答案!) (6认同)
  • 如果我无法关闭子进程,例如.由于例外?stdout-reader线程不会死,python会挂起,即使主线程退出,不是吗?怎么可以解决这个问题呢?python 2.x不支持查杀线程,更糟糕的是,不支持中断它们.:((显然应该处理异常以确保子进程被关闭,但万一它不会,你能做什么?) (4认同)
  • @Justin:'out.readline'不会阻止它在另一个线程中执行的主线程. (3认同)
  • 我已经在`shelljob`http://psypi.python.org/pypi/shelljob包中创建了一些友好的包装器 (3认同)
  • 在线程内部,对`out.readline`的调用会阻塞线程和主线程,我必须等到readline返回,然后其他所有内容才会继续.有什么简单的方法吗?(我正在从我的进程中读取多行,这也是另一个正在执行数据库和事物的.py文件) (2认同)
  • close_fds绝对不是你想盲目复制到你的应用程序中的东西...... (2认同)
  • @naxa:注意`daemon = True`:如果退出主线程,python进程将不会挂起. (2认同)
  • 最终结果在 https://github.com/netinvent/command_runner 的 python 包 `command_runner` 中可见,我使用两种不同的方法以非阻塞方式获取 subprocess.Popen 输出,一种基于上面的答案,两种方法在 MSWin 和 Linux 以及多个 python 版本上完全工作和测试。@jfs,感谢您的宝贵时间。 (2认同)

Jes*_*sse 76

我经常遇到类似的问题; 我经常编写的Python程序需要能够执行一些主要功能,同时从命令行(stdin)接受用户输入.简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()阻塞并且没有超时.如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能因为readline()在另一个等待一行的线程中仍然阻塞.我发现这个问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)
Run Code Online (Sandbox Code Playgroud)

在我看来,这比使用选择或信号模块解决这个问题要清晰一点,但是它再次只适用于UNIX ...

  • 请不要使用繁忙的循环.使用poll()和超时来等待数据. (10认同)
  • [Jesse的回答](http://stackoverflow.com/questions/375427/non-blocking-read-on-a-stream-in-python/1810703#1810703)不正确.根据Guido的说法,readline在非阻塞模式下无法正常工作,并且在Python 3000之前不会正常工作.http://bugs.python.org/issue1175#msg56041如果要使用fcntl将文件设置为非阻塞模式,你必须使用较低级别的os.read()并自己分开行.将fcntl与执行行缓冲的高级调用混合会产生麻烦. (10认同)
  • 在Python 2中使用readline似乎不正确.请参阅anonnn的答案http://stackoverflow.com/questions/375427/non-blocking-read-on-a-stream-in-python/4025909#4025909 (2认同)

jfs*_*jfs 38

Python 3.4 为异步IO 模块引入了新的临时API.asyncio

这种方法类似于twisted@Bryan Ward的基于答案的答案 - 定义一个协议,一旦数据准备就调用它的方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()
Run Code Online (Sandbox Code Playgroud)

请参阅文档中的"子流程".

有一个高级接口asyncio.create_subprocess_exec()返回允许使用coroutine异步读取行的Process对象 (使用/ Python 3.5+语法):StreamReader.readline()asyncawait

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))
Run Code Online (Sandbox Code Playgroud)

readline_and_kill() 执行以下任务:

  • 启动子进程,将其stdout重定向到管道
  • 从子进程'stdout异步读取一行
  • 杀死子进程
  • 等它退出

如有必要,每个步骤都可以通过超时秒限制.


Noa*_*oah 19

尝试使用asyncproc模块.例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out
Run Code Online (Sandbox Code Playgroud)

该模块负责S.Lott建议的所有线程.

  • asyncproc是GPL,进一步限制了它的使用:-( (25认同)
  • asyncproc在Windows上不起作用,而windows不支持os.WNOHANG :-( (12认同)

Bry*_*ard 17

您可以在Twisted中轻松完成此操作.根据您现有的代码库,这可能不是那么容易使用,但如果您正在构建一个扭曲的应用程序,那么这样的事情几乎变得微不足道.您创建一个ProcessProtocol类,并覆盖该outReceived()方法.Twisted(取决于所使用的反应器)通常只是一个select()安装了回调的大循环来处理来自不同文件描述符(通常是网络套接字)的数据.因此,该outReceived()方法只是安装一个回调来处理来自的数据STDOUT.演示此行为的简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
Run Code Online (Sandbox Code Playgroud)

扭曲的文档对此有一些有用的信息.

如果你围绕Twisted构建整个应用程序,它会与本地或远程的其他进程进行异步通信,就像这样非常优雅.另一方面,如果你的程序不是建立在Twisted之上,那么这实际上并没有那么有用.希望这对其他读者有帮助,即使它不适用于您的特定应用程序.

  • @naxa我不认为他所指的`select()`与你的相同.我假设这是因为`Twisted`适用于Windows ... (2认同)

小智 15

使用select&read(1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))
Run Code Online (Sandbox Code Playgroud)

对于readline() - 如:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a
Run Code Online (Sandbox Code Playgroud)

  • 不好.根据[docs](http://docs.python.org/2/library/select.html),`select`不适用于带文件描述符的窗口 (5认同)
  • 我的天啊。一次读取兆字节,或者可能是千兆字节......这是我很长一段时间以来见过的最糟糕的想法......不用说,这段代码不起作用,因为 `proc.stdout.read() ` 无论参数多小,都是阻塞调用。 (2认同)

use*_*515 15

现代 Python 的情况要好得多。

这是一个简单的子程序“hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")
Run Code Online (Sandbox Code Playgroud)

以及与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

打印出:

b'hello bob\n'
b'hello alice\n'
Run Code Online (Sandbox Code Playgroud)

请注意,实际模式(也是此处和相关问题中几乎所有先前的答案)是将子级的 stdout 文件描述符设置为非阻塞,然后在某种选择循环中轮询它。当然,现在这个循环是由 asyncio 提供的。

  • 在我看来,这是最好的答案,它实际上在幕后使用 Windows 重叠/异步读/写(而不是处理阻塞的线程的某些变体)。根据文档,您应该调用 `drain()` 以确保 write(..) 实际通过 (3认同)

saa*_*aaj 12

在类 Unix 系统和 Python 3.5+ 上os.set_blocking,它完全符合它所说的。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()
Run Code Online (Sandbox Code Playgroud)

这输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
Run Code Online (Sandbox Code Playgroud)

随着os.set_blocking评论说这是:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
Run Code Online (Sandbox Code Playgroud)

  • 这是迄今为止最优雅的解决方案,感谢您让我度过了愉快的一天(实际上是晚上^^) (7认同)
  • 非常优雅,而且非常高效。感谢这个解决方案,它完美运行! (4认同)
  • 谢谢你!当使用带有“Selector”的“Popen”管道时,这非常有效,以确保它永远不会阻塞。 (2认同)
  • 优雅,是的。但它不是多平台(根据问题)。 (2认同)
  • Windows 用户必须将其 Python 更新到 3.12,Windows 上的较低版本不提供此功能。我的 conda 更新只给了我 Python 3.11.5,太伤心了。 (2认同)

小智 9

这是一个基于线程的简单解决方案:

  • 适用于 Linux 和 Windows(不依赖select)。
  • stdout异步读取stderr
  • 不依赖于具有任意等待时间的主动轮询(CPU 友好)。
  • 不使用asyncio(可能与其他库冲突)。
  • 运行直到子进程终止。

打印机.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

读者.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()
Run Code Online (Sandbox Code Playgroud)


mon*_*kut 8

一种解决方案是使另一个进程执行您对进程的读取,或者使进程的线程超时.

这是超时函数的线程版本:

http://code.activestate.com/recipes/473878/

但是,你需要阅读stdout,因为它正在进入?另一种解决方案可能是将输出转储到文件并等待进程使用p.wait()完成.

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()
Run Code Online (Sandbox Code Playgroud)


Vuk*_*man 7

免责声明:这仅适用于龙卷风

您可以通过将fd设置为非阻塞来执行此操作,然后使用ioloop注册回调.我把它打包成一个名为tornado_subprocess的蛋,你可以通过PyPI安装它:

easy_install tornado_subprocess
Run Code Online (Sandbox Code Playgroud)

现在你可以这样做:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)

您也可以将它与RequestHandler一起使用

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()
Run Code Online (Sandbox Code Playgroud)


Vik*_*udi 7

现有的解决方案对我不起作用(详情如下).最终工作的是使用read(1)实现readline(基于这个答案).后者不会阻止:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()
Run Code Online (Sandbox Code Playgroud)

为什么现有解决方案不起作用:

  1. 需要readline的解决方案(包括基于Queue的解决方案)始终会阻止.杀死执行readline的线程很困难(不可能?)它只会在创建它的进程完成时被杀死,但不会在生成输出的进程被终止时被杀死.
  2. 正如aonnn所指出的,将低级别fcntl与高级别readline调用混合可能无法正常工作.
  3. 使用select.poll()很整洁,但根据python docs在Windows上不起作用.
  4. 使用第三方库似乎对此任务有些过分,并添加了其他依赖项.


小智 5

我添加此问题以读取一些subprocess.Popen stdout。这是我的非阻塞读取解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Run Code Online (Sandbox Code Playgroud)

  • 根据[docs](http://docs.python.org/2/library/fcntl.html),fcntl在Windows上不起作用。 (5认同)

dat*_*boy 5

这是我的代码,用于捕获子流程ASAP的每个输出,包括部分行。它同时抽水,并且以几乎正确的顺序抽出stdout和stderr。

经过测试并在Python 2.7 linux&Windows上正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()
Run Code Online (Sandbox Code Playgroud)


Tom*_*ime 5

这无阻塞读的版本并不需要特殊的模块,将工作外的开箱上大多数Linux发行版的.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())
Run Code Online (Sandbox Code Playgroud)


归档时间:

查看次数:

209104 次

最近记录:

6 年,4 月 前