如何使python脚本在bash和python中都可以管道化

usu*_*sul 7 python pipeline python-multithreading

简介:我想在命令行上编写类似于bash脚本的python脚本,但是我还想在python中轻松地将它们连接在一起.我遇到麻烦的地方是制造后者的胶水.

所以,想象一下我写了两个剧本,script1.pyscript2.py我能管他们在一起就像这样:

echo input_string | ./script1.py -a -b | ./script2.py -c -d
Run Code Online (Sandbox Code Playgroud)

如何从另一个python文件中获取此行为? 这是我所知道的方式,但我不喜欢:

arg_string_1 = convert_to_args(param_1, param_2)
arg_string_2 = convert_to_args(param_3, param_4)
output_string = subprocess.check_output("echo " + input_string + " | ./script1.py " + arg_string_1 + " | ./script2.py " + arg_string_2)
Run Code Online (Sandbox Code Playgroud)

如果我不想利用多线程,我可以做这样的事情(?):

input1  = StringIO(input_string)
output1 = StringIO()
script1.main(param_1, param_2, input1, output1)
input2  = StringIO(output1.get_value())
output2 = StringIO()
script2.main(param_3, param_4, input2, output2)
Run Code Online (Sandbox Code Playgroud)

这是我尝试的方法,但我坚持写胶水.我很感激学习如何完成下面的方法,或建议更好的设计/方法!

我的方法:我写了script1.py,script2.py看起来像:

#!/usr/bin/python3

... # import sys and define "parse_args"

def main(param_1, param_2, input, output):
   for line in input:
     ...
     print(stuff, file=output)

if __name__ == "__main__":
  parameter_1, parameter_2 = parse_args(sys.argv)
  main(parameter_1, parameter_2, sys.stdin, sys.stdout)
Run Code Online (Sandbox Code Playgroud)

然后我想写这样的东西,但不知道如何完成:

pipe_out, pipe_in = ????
output = StringIO()
thread_1 = Thread(target=script1.main, args=(param_1, param_2, StreamIO(input_string), pipe_out))
thread_2 = Thread(target=script2.main, args=(param_3, param_4, pipe_in, output)
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
output_str = output.get_value()
Run Code Online (Sandbox Code Playgroud)

Sci*_*rog 2

对于“管道输入”,与方法sys.stdin一起使用readlines()。(使用方法read()将一次读取一个字符。)

要将信息从一个线程传递到另一个线程,您可以使用Queue. 您必须定义一种方式来表示数据结束。在我的示例中,由于线程之间传递的所有数据都是str,因此我只需使用一个None对象来表示数据结束(因为它不能出现在传输的数据中)。

还可以使用更多线程,或者在线程中使用不同的函数。

sys.argv为了简单起见,我没有在示例中包含。修改它来获取参数(parameter1,...)应该很容易。

import sys
from threading import Thread
from Queue import Queue
import fileinput

def stdin_to_queue( output_queue ):
  for inp_line in sys.stdin.readlines():     # input one line at at time                                                
    output_queue.put( inp_line, True, None )  # blocking, no timeout
  output_queue.put( None, True, None )    # signal the end of data                                                  


def main1(input_queue, output_queue, arg1, arg2):
  do_loop = True
  while do_loop:
    inp_data = input_queue.get(True)
    if inp_data is None:
      do_loop = False
      output_queue.put( None, True, None )  # signal end of data                                                    
    else:
      out_data = arg1 + inp_data.strip('\r\n').upper() + arg2 #  or whatever transformation...                                    
      output_queue.put( out_data, True, None )

def queue_to_stdout(input_queue):
  do_loop = True
  while do_loop:
    inp_data = input_queue.get(True)
    if inp_data is None:
      do_loop = False
    else:
      sys.stdout.write( inp_data )


def main():
  q12 = Queue()
  q23 = Queue()
  q34 = Queue()
  t1 = Thread(target=stdin_to_queue, args=(q12,) )
  t2 = Thread(target=main1, args=(q12,q23,'(',')') )
  t3 = Thread(target=main1, args=(q23,q34,'[',']') )
  t4 = Thread(target=queue_to_stdout, args=(q34,))
  t1.start()
  t2.start()
  t3.start()
  t4.start()


main()
Run Code Online (Sandbox Code Playgroud)

最后,我用一个文本文件测试了这个程序(python2)。

head sometextfile.txt | python script.py 
Run Code Online (Sandbox Code Playgroud)