Mat*_*agé 477 python io subprocess nonblocking
我正在使用子进程模块启动子进程并连接到它的输出流(stdout).我希望能够在其标准输出上执行非阻塞读取.有没有办法让.readline非阻塞或在我调用之前检查流上是否有数据.readline
?我希望这是可移植的,或至少在Windows和Linux下工作.
这是我现在如何做到的(.readline
如果没有数据可用,则阻止它):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Run Code Online (Sandbox Code Playgroud)
jfs*_*jfs 393
fcntl
,select
,asyncproc
不会在这种情况下帮助.
无论操作系统如何,无阻塞地读取流的可靠方法是使用Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Run Code Online (Sandbox Code Playgroud)
Jes*_*sse 76
我经常遇到类似的问题; 我经常编写的Python程序需要能够执行一些主要功能,同时从命令行(stdin)接受用户输入.简单地将用户输入处理功能放在另一个线程中并不能解决问题,因为readline()
阻塞并且没有超时.如果主要功能已完成并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但它不能因为readline()
在另一个等待一行的线程中仍然阻塞.我发现这个问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
Run Code Online (Sandbox Code Playgroud)
在我看来,这比使用选择或信号模块解决这个问题要清晰一点,但是它再次只适用于UNIX ...
jfs*_*jfs 38
Python 3.4 为异步IO 模块引入了新的临时API.asyncio
这种方法类似于twisted
@Bryan Ward的基于答案的答案 - 定义一个协议,一旦数据准备就调用它的方法:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
Run Code Online (Sandbox Code Playgroud)
请参阅文档中的"子流程".
有一个高级接口asyncio.create_subprocess_exec()
返回允许使用coroutine异步读取行的Process
对象
(使用/ Python 3.5+语法):StreamReader.readline()
async
await
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
Run Code Online (Sandbox Code Playgroud)
readline_and_kill()
执行以下任务:
如有必要,每个步骤都可以通过超时秒限制.
Noa*_*oah 19
尝试使用asyncproc模块.例如:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
Run Code Online (Sandbox Code Playgroud)
该模块负责S.Lott建议的所有线程.
Bry*_*ard 17
您可以在Twisted中轻松完成此操作.根据您现有的代码库,这可能不是那么容易使用,但如果您正在构建一个扭曲的应用程序,那么这样的事情几乎变得微不足道.您创建一个ProcessProtocol
类,并覆盖该outReceived()
方法.Twisted(取决于所使用的反应器)通常只是一个select()
安装了回调的大循环来处理来自不同文件描述符(通常是网络套接字)的数据.因此,该outReceived()
方法只是安装一个回调来处理来自的数据STDOUT
.演示此行为的简单示例如下:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
Run Code Online (Sandbox Code Playgroud)
在扭曲的文档对此有一些有用的信息.
如果你围绕Twisted构建整个应用程序,它会与本地或远程的其他进程进行异步通信,就像这样非常优雅.另一方面,如果你的程序不是建立在Twisted之上,那么这实际上并没有那么有用.希望这对其他读者有帮助,即使它不适用于您的特定应用程序.
小智 15
使用select&read(1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Run Code Online (Sandbox Code Playgroud)
对于readline() - 如:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
Run Code Online (Sandbox Code Playgroud)
use*_*515 15
现代 Python 的情况要好得多。
这是一个简单的子程序“hello.py”:
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
Run Code Online (Sandbox Code Playgroud)
以及与之交互的程序:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
打印出:
b'hello bob\n'
b'hello alice\n'
Run Code Online (Sandbox Code Playgroud)
请注意,实际模式(也是此处和相关问题中几乎所有先前的答案)是将子级的 stdout 文件描述符设置为非阻塞,然后在某种选择循环中轮询它。当然,现在这个循环是由 asyncio 提供的。
saa*_*aaj 12
在类 Unix 系统和 Python 3.5+ 上os.set_blocking
,它完全符合它所说的。
import os
import time
import subprocess
cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
# first iteration always produces empty byte string in non-blocking mode
for i in range(2):
line = p.stdout.readline()
print(i, line)
time.sleep(0.5)
if time.time() > start + 5:
break
p.terminate()
Run Code Online (Sandbox Code Playgroud)
这输出:
1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'
Run Code Online (Sandbox Code Playgroud)
随着os.set_blocking
评论说这是:
0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''
Run Code Online (Sandbox Code Playgroud)
小智 9
这是一个基于线程的简单解决方案:
select
)。stdout
异步读取stderr
。asyncio
(可能与其他库冲突)。打印机.py
import time
import sys
sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
读者.py
import queue
import subprocess
import sys
import threading
def enqueue_stream(stream, queue, type):
for line in iter(stream.readline, b''):
queue.put(str(type) + line.decode('utf-8'))
stream.close()
def enqueue_process(process, queue):
process.wait()
queue.put('x')
p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()
while True:
line = q.get()
if line[0] == 'x':
break
if line[0] == '2': # stderr
sys.stdout.write("\033[0;31m") # ANSI red color
sys.stdout.write(line[1:])
if line[0] == '2':
sys.stdout.write("\033[0m") # reset ANSI code
sys.stdout.flush()
tp.join()
to.join()
te.join()
Run Code Online (Sandbox Code Playgroud)
一种解决方案是使另一个进程执行您对进程的读取,或者使进程的线程超时.
这是超时函数的线程版本:
http://code.activestate.com/recipes/473878/
但是,你需要阅读stdout,因为它正在进入?另一种解决方案可能是将输出转储到文件并等待进程使用p.wait()完成.
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
Run Code Online (Sandbox Code Playgroud)
免责声明:这仅适用于龙卷风
您可以通过将fd设置为非阻塞来执行此操作,然后使用ioloop注册回调.我把它打包成一个名为tornado_subprocess的蛋,你可以通过PyPI安装它:
easy_install tornado_subprocess
Run Code Online (Sandbox Code Playgroud)
现在你可以这样做:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
Run Code Online (Sandbox Code Playgroud)
您也可以将它与RequestHandler一起使用
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Run Code Online (Sandbox Code Playgroud)
现有的解决方案对我不起作用(详情如下).最终工作的是使用read(1)实现readline(基于这个答案).后者不会阻止:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Run Code Online (Sandbox Code Playgroud)
为什么现有解决方案不起作用:
小智 5
我添加此问题以读取一些subprocess.Popen stdout。这是我的非阻塞读取解决方案:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Run Code Online (Sandbox Code Playgroud)
这是我的代码,用于捕获子流程ASAP的每个输出,包括部分行。它同时抽水,并且以几乎正确的顺序抽出stdout和stderr。
经过测试并在Python 2.7 linux&Windows上正确工作。
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Run Code Online (Sandbox Code Playgroud)
这无阻塞读的版本并不需要特殊的模块,将工作外的开箱上大多数Linux发行版的.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
209104 次 |
最近记录: |