Yan*_*ani 14 python buffer subprocess pipe flush
我正在使用该subprocess模块从python启动进程.我希望能够在写入/缓冲后立即访问输出(stdout,stderr).
例如,假设我想运行一个名为counter.pyvia a 的python文件subprocess.内容counter.py如下:
import sys
for index in range(10):
# Write data to standard out.
sys.stdout.write(str(index))
# Push buffered data to disk.
sys.stdout.flush()
Run Code Online (Sandbox Code Playgroud)
负责执行该counter.py示例的父进程如下:
import subprocess
command = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
Run Code Online (Sandbox Code Playgroud)
使用该counter.py示例,我可以在流程完成之前访问数据.这很棒!这正是我想要的.但是,删除该sys.stdout.flush()调用会阻止在我想要的时间访问数据.这是不好的!这正是我不想要的.我的理解是flush()调用强制将数据写入磁盘,在将数据写入磁盘之前,它只存在于缓冲区中.请记住,我希望能够运行任何进程.我不希望这个过程执行这种刷新,但我仍然期望数据可以实时(或接近它).有没有办法实现这个目标?
关于父进程的快速说明.您可能会注意到我正在使用bufsize=0线路缓冲.我希望这会导致每行的磁盘刷新,但它似乎不会那样工作.这个论点如何运作?
您还会注意到我正在使用subprocess.PIPE.这是因为它似乎是在父进程和子进程之间生成IO对象的唯一值.我通过查看模块中的Popen._get_handles方法得出了这个结论subprocess(我在这里指的是Windows定义).有两个重要的变量,c2pread而c2pwrite这是基于设置stdout传递到价值Popen构造.例如,如果stdout未设置,则不设置c2pread变量.使用文件描述符和类文件对象时也是如此.我真的不知道这是否重要但我的直觉告诉我,我想要读取和写入IO对象以实现我想要实现的目标 - 这就是我选择的原因subprocess.PIPE.如果有人能够更详细地解释这一点,我将非常感激.同样,如果有令人信服的理由使用subprocess.PIPE我以外的其他东西.
import time
import subprocess
import threading
import Queue
class StreamReader(threading.Thread):
"""
Threaded object used for reading process output stream (stdout, stderr).
"""
def __init__(self, stream, queue, *args, **kwargs):
super(StreamReader, self).__init__(*args, **kwargs)
self._stream = stream
self._queue = queue
# Event used to terminate thread. This way we will have a chance to
# tie up loose ends.
self._stop = threading.Event()
def stop(self):
"""
Stop thread. Call this function to terminate the thread.
"""
self._stop.set()
def stopped(self):
"""
Check whether the thread has been terminated.
"""
return self._stop.isSet()
def run(self):
while True:
# Flush buffered data (not sure this actually works?)
self._stream.flush()
# Read available data.
for line in iter(self._stream.readline, b''):
self._queue.put(line)
# Breather.
time.sleep(0.25)
# Check whether thread has been terminated.
if self.stopped():
break
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
stdout_queue = Queue.Queue()
stdout_reader = StreamReader(process.stdout, stdout_queue)
stdout_reader.daemon = True
stdout_reader.start()
# Read standard out of the child process whilst it is active.
while True:
# Attempt to read available data.
try:
line = stdout_queue.get(timeout=0.1)
print '%s' % line
# If data was not read within time out period. Continue.
except Queue.Empty:
# No data currently available.
pass
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
# Process is no longer active. Nothing more to read. Stop reader thread.
stdout_reader.stop()
Run Code Online (Sandbox Code Playgroud)
在这里,我执行的逻辑从一个线程中的子进程读取标准.这允许在数据可用之前读取阻塞的情况.我们不是等待一段可能很长的时间,而是检查是否有可用的数据,在超时时间内读取,如果没有则继续循环.
我还尝试了另一种使用一种非阻塞读取的方法.此方法使用该ctypes模块访问Windows系统调用.请注意,我并不完全理解我在这里做的事情 - 我只是想了解一些我在其他帖子中看到的示例代码.在任何情况下,以下代码段都无法解决缓冲问题.我的理解是,它只是另一种打击潜在的长读取时间的方法.
import os
import subprocess
import ctypes
import ctypes.wintypes
import msvcrt
cmd = ['python', 'counter.py']
process = subprocess.Popen(
cmd,
bufsize=1,
stdout=subprocess.PIPE,
)
def read_output_non_blocking(stream):
data = ''
available_bytes = 0
c_read = ctypes.c_ulong()
c_available = ctypes.c_ulong()
c_message = ctypes.c_ulong()
fileno = stream.fileno()
handle = msvcrt.get_osfhandle(fileno)
# Read available data.
buffer_ = None
bytes_ = 0
status = ctypes.windll.kernel32.PeekNamedPipe(
handle,
buffer_,
bytes_,
ctypes.byref(c_read),
ctypes.byref(c_available),
ctypes.byref(c_message),
)
if status:
available_bytes = int(c_available.value)
if available_bytes > 0:
data = os.read(fileno, available_bytes)
print data
return data
while True:
# Read standard out for child process.
stdout = read_output_non_blocking(process.stdout)
print stdout
# Check whether child process is still active.
if process.poll() != None:
# Process is no longer active.
break
Run Code Online (Sandbox Code Playgroud)
评论非常感谢.
干杯
Mar*_*ers 10
这里的问题是由子进程缓冲.你的subprocess代码已经可以正常工作,但是如果你有一个缓冲其输出的子进程,那么subprocess管道可以做任何事情.
我不能强调这一点:你看到的缓冲延迟是子进程的责任,它处理缓冲的方式与subprocess模块无关.
你已经发现了这个; 这就是为什么添加sys.stdout.flush()子进程会使数据更快地显示出来的原因; 子进程在将其发送到sys.stdout管道1之前使用缓冲的I/O(内存高速缓存来收集写入的数据).
当sys.stdout连接到终端时,Python自动使用行缓冲; 只要写入换行符,缓冲区就会刷新.使用管道时,sys.stdout未连接到终端,而是使用固定大小的缓冲区.
现在,可以告诉Python子进程以不同方式处理缓冲; 您可以设置环境变量或使用命令行开关来改变它sys.stdout(和sys.stderr和sys.stdin)使用缓冲的方式.从Python命令行文档:
-u
强制stdin,stdout和stderr完全无缓冲.在重要的系统上,还将stdin,stdout和stderr置于二进制模式.[...]
PYTHONUNBUFFERED
如果将其设置为非空字符串,则等同于指定-u选项.
如果您正在处理非 Python进程的子进程并且遇到缓存问题,那么您需要查看这些进程的文档,看看它们是否可以切换为使用无缓冲的I/O,或者切换到更理想的缓冲策略.
您可以尝试的一件事是使用该script -c命令为子进程提供伪终端.但是,这是一个POSIX工具,可能在Windows上不可用.
1. 应该注意的是,当冲洗管道时,没有数据被"写入磁盘"; 所有数据都保留在内存中.I/O缓冲区只是内存缓存,通过处理更大的块中的数据来从I/O中获得最佳性能.只有拥有基于磁盘的文件对象才会将fileobj.flush()其推送到操作系统,这通常意味着数据确实写入磁盘.