usu*_*sul 7 python pipeline python-multithreading
简介:我想在命令行上编写类似于bash脚本的python脚本,但是我还想在python中轻松地将它们连接在一起.我遇到麻烦的地方是制造后者的胶水.
所以,想象一下我写了两个剧本,script1.py和script2.py我能管他们在一起就像这样:
echo input_string | ./script1.py -a -b | ./script2.py -c -d
Run Code Online (Sandbox Code Playgroud)
如何从另一个python文件中获取此行为? 这是我所知道的方式,但我不喜欢:
arg_string_1 = convert_to_args(param_1, param_2)
arg_string_2 = convert_to_args(param_3, param_4)
output_string = subprocess.check_output("echo " + input_string + " | ./script1.py " + arg_string_1 + " | ./script2.py " + arg_string_2)
Run Code Online (Sandbox Code Playgroud)
如果我不想利用多线程,我可以做这样的事情(?):
input1 = StringIO(input_string)
output1 = StringIO()
script1.main(param_1, param_2, input1, output1)
input2 = StringIO(output1.get_value())
output2 = StringIO()
script2.main(param_3, param_4, input2, output2)
Run Code Online (Sandbox Code Playgroud)
这是我尝试的方法,但我坚持写胶水.我很感激学习如何完成下面的方法,或建议更好的设计/方法!
我的方法:我写了script1.py,script2.py看起来像:
#!/usr/bin/python3
... # import sys and define "parse_args"
def main(param_1, param_2, input, output):
for line in input:
...
print(stuff, file=output)
if __name__ == "__main__":
parameter_1, parameter_2 = parse_args(sys.argv)
main(parameter_1, parameter_2, sys.stdin, sys.stdout)
Run Code Online (Sandbox Code Playgroud)
然后我想写这样的东西,但不知道如何完成:
pipe_out, pipe_in = ????
output = StringIO()
thread_1 = Thread(target=script1.main, args=(param_1, param_2, StreamIO(input_string), pipe_out))
thread_2 = Thread(target=script2.main, args=(param_3, param_4, pipe_in, output)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
output_str = output.get_value()
Run Code Online (Sandbox Code Playgroud)
对于“管道输入”,与方法sys.stdin一起使用readlines()。(使用方法read()将一次读取一个字符。)
要将信息从一个线程传递到另一个线程,您可以使用Queue. 您必须定义一种方式来表示数据结束。在我的示例中,由于线程之间传递的所有数据都是str,因此我只需使用一个None对象来表示数据结束(因为它不能出现在传输的数据中)。
还可以使用更多线程,或者在线程中使用不同的函数。
sys.argv为了简单起见,我没有在示例中包含。修改它来获取参数(parameter1,...)应该很容易。
import sys
from threading import Thread
from Queue import Queue
import fileinput
def stdin_to_queue( output_queue ):
for inp_line in sys.stdin.readlines(): # input one line at at time
output_queue.put( inp_line, True, None ) # blocking, no timeout
output_queue.put( None, True, None ) # signal the end of data
def main1(input_queue, output_queue, arg1, arg2):
do_loop = True
while do_loop:
inp_data = input_queue.get(True)
if inp_data is None:
do_loop = False
output_queue.put( None, True, None ) # signal end of data
else:
out_data = arg1 + inp_data.strip('\r\n').upper() + arg2 # or whatever transformation...
output_queue.put( out_data, True, None )
def queue_to_stdout(input_queue):
do_loop = True
while do_loop:
inp_data = input_queue.get(True)
if inp_data is None:
do_loop = False
else:
sys.stdout.write( inp_data )
def main():
q12 = Queue()
q23 = Queue()
q34 = Queue()
t1 = Thread(target=stdin_to_queue, args=(q12,) )
t2 = Thread(target=main1, args=(q12,q23,'(',')') )
t3 = Thread(target=main1, args=(q23,q34,'[',']') )
t4 = Thread(target=queue_to_stdout, args=(q34,))
t1.start()
t2.start()
t3.start()
t4.start()
main()
Run Code Online (Sandbox Code Playgroud)
最后,我用一个文本文件测试了这个程序(python2)。
head sometextfile.txt | python script.py
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
918 次 |
| 最近记录: |