hak*_*anc 16 python legacy io asynchronous
问题: 我有一个设计糟糕的Fortran程序(我无法更改它,我坚持使用它)从stdin和其他输入文件中获取文本输入,并将文本输出结果写入stdout和其他输出文件.输入和输出的大小非常大,我想避免写入硬盘驱动器(慢速操作).我编写了一个函数,迭代几个输入文件的行,我也有多个输出的解析器.我真的不知道程序是否首先读取所有输入然后开始输出,或者在读取输入时开始输出.
目标: 拥有一个为外部程序提供所需功能的函数,并在输出来自程序时解析输出,而无需将数据写入硬盘驱动器上的文本文件.
研究: 使用文件的天真方式是:
from subprocess import PIPE, Popen
def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files):
for filename, file_iter in input_files.iteritems():
with open(filename ,'w') as f:
for line in file_iter:
f.write(line + '\n')
p_sub = Popen(
shlex.split(cmd),
stdin = PIPE,
stdout = open('stdout.txt', 'w'),
stderr = open('stderr.txt', 'w'),
bufsize=1
)
for line in stdin_iter:
p_sub.stdin.write(line + '\n')
p_sub.stdin.close()
p_sub.wait()
data = {}
for filename, parse_func in output_files.iteritems():
# The stdout.txt and stderr.txt is included here
with open(filename,'r') as f:
data[filename] = parse_func(
iter(f.readline, b'')
)
return data
Run Code Online (Sandbox Code Playgroud)
我试图和子进程模块一起执行外部程序.使用命名管道和多处理来处理附加的输入/输出文件.我想用一个迭代器(返回输入行)来输入stdin,将stderr保存在列表中,并解析stdout来自外部程序.输入和输出可能非常大,因此使用communicate是不可行的.
我在格式上有一个解析器:
def parser(iterator):
for line in iterator:
# Do something
if condition:
break
some_other_function(iterator)
return data
Run Code Online (Sandbox Code Playgroud)
我查看了这个解决方案,select用来选择合适的流,但我不知道如何使它与我的stdout解析器一起使用以及如何提供stdin.
您应该对Fortran程序的所有输入和输出使用命名管道,以避免写入磁盘.然后,在您的使用者中,您可以使用线程从每个程序的输出源中读取,并将信息添加到Queue以进行有序处理.
为了对此进行建模,我创建了一个python应用程序daemon.py,它从标准输入读取并返回平方根直到EOF.它将所有输入记录到指定为命令行参数的日志文件中,并将平方根打印到stdout,将所有错误打印到stderr.我认为它模拟你的程序(当然输出文件的数量只有一个,但它可以缩放).您可以在此处查看此测试应用程序的源代码.注意显式调用stdout.flush().默认情况下,标准输出是打印缓冲的,这意味着它在结尾输出,消息不会按顺序到达.我希望您的Fortran应用程序不会缓冲其输出.我相信我的示例应用程序可能无法在Windows上运行,因为仅使用Unix select,这在您的情况下无关紧要.
我有我的消费者应用程序,它将守护程序应用程序作为子进程启动,stdin,stdout和stderr重定向到subprocess.PIPEs.这些管道中的每一个都被赋予一个不同的线程,一个用于提供输入,三个用于分别处理日志文件,错误和标准输出.它们都将消息添加到Queue主线程读取的共享中并发送给解析器.
这是我的消费者代码:
import os, random, time
import subprocess
import threading
import Queue
import atexit
def setup():
# make a named pipe for every file the program should write
logfilepipe='logpipe'
os.mkfifo(logfilepipe)
def cleanup():
# put your named pipes here to get cleaned up
logfilepipe='logpipe'
os.remove(logfilepipe)
# run our cleanup code no matter what - avoid leaving pipes laying around
# even if we terminate early with Ctrl-C
atexit.register(cleanup)
# My example iterator that supplies input for the program. You already have an iterator
# so don't worry about this. It just returns a random input from the sample_data list
# until the maximum number of iterations is reached.
class MyIter():
sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94]
def __init__(self, numiterations=1000):
self.numiterations=numiterations
self.current = 0
def __iter__(self):
return self
def next(self):
self.current += 1
if self.current > self.numiterations:
raise StopIteration
else:
return random.choice(self.__class__.sample_data)
# Your parse_func function - I just print it out with a [tag] showing its source.
def parse_func(source,line):
print "[%s] %s" % (source,line)
# Generic function for sending standard input to the problem.
# p - a process handle returned by subprocess
def input_func(p, queue):
# run the command with output redirected
for line in MyIter(30): # Limit for testing purposes
time.sleep(0.1) # sleep a tiny bit
p.stdin.write(str(line)+'\n')
queue.put(('INPUT', line))
p.stdin.close()
p.wait()
# Once our process has ended, tell the main thread to quit
queue.put(('QUIT', True))
# Generic function for reading output from the program. source can either be a
# named pipe identified by a string, or subprocess.PIPE for stdout and stderr.
def read_output(source, queue, tag=None):
print "Starting to read output for %r" % source
if isinstance(source,str):
# Is a file or named pipe, so open it
source=open(source, 'r') # open file with string name
line = source.readline()
# enqueue and read lines until EOF
while line != '':
queue.put((tag, line.rstrip()))
line = source.readline()
if __name__=='__main__':
cmd='daemon.py'
# set up our FIFOs instead of using files - put file names into setup() and cleanup()
setup()
logfilepipe='logpipe'
# Message queue for handling all output, whether it's stdout, stderr, or a file output by our command
lq = Queue.Queue()
# open the subprocess for command
print "Running command."
p = subprocess.Popen(['/path/to/'+cmd,logfilepipe],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Start threads to handle the input and output
threading.Thread(target=input_func, args=(p, lq)).start()
threading.Thread(target=read_output, args=(p.stdout, lq, 'OUTPUT')).start()
threading.Thread(target=read_output, args=(p.stderr, lq, 'ERRORS')).start()
# open a thread to read any other output files (e.g. log file) as named pipes
threading.Thread(target=read_output, args=(logfilepipe, lq, 'LOG')).start()
# Now combine the results from our threads to do what you want
run=True
while(run):
(tag, line) = lq.get()
if tag == 'QUIT':
run=False
else:
parse_func(tag, line)
Run Code Online (Sandbox Code Playgroud)
我的迭代器返回一个随机输入值(其中一些是垃圾导致错误).你的应该是替补.程序将一直运行到输入结束,然后等待子QUIT进程完成,然后将消息排入主线程.我的parse_func显然非常简单,只需打印出消息及其来源,但您应该能够处理某些事情.从输出源读取的函数旨在与PIPE和字符串一起使用 - 不要在主线程上打开管道,因为它们将在输入可用之前阻塞.因此对于文件读取器(例如,读取日志文件),最好让子线程打开文件并阻塞.但是,我们在主线程上生成子进程,因此我们可以将stdin,stdout和stderr的句柄传递给它们各自的子线程.
部分基于此multitail的Python实现.
如果您在发送新作业之前等待结果结束,那么 Fortran 程序在每个作业结束时调用刷新(也可以频繁地调用)非常重要。
该命令取决于编译器,例如GNU fortran CALL FLUSH(unitnumber),或者可以通过关闭输出并再次打开以进行附加来模拟。
您还可以轻松地在末尾写入一些带有许多空白字符的空行,它会填充缓冲区大小,并获得一个新的数据块。5000 个空白字符可能就足够了,但也不能太多,否则会阻塞管道的 Fortran 端。如果您在发送新作业后立即读取这些空行,您甚至不需要非阻塞读取。作业的最后一行可以在数字应用程序中轻松识别。如果您要编写一个“聊天”应用程序,您需要其他人编写的东西。
| 归档时间: |
|
| 查看次数: |
1660 次 |
| 最近记录: |