muc*_*ckl 10 python multithreading subprocess buffering
我有一个可执行文件,我使用subprocess.Popen调用.然后,我打算通过stdin使用一个线程从stdin中提取一些数据,该线程从队列中读取其值,稍后将在另一个线程中填充.应该使用另一个线程中的stdout管道读取输出,并再次在队列中进行排序.
据我之前的研究所理解,使用带队列的线程是一种很好的做法.
遗憾的是,外部可执行文件不会快速给出每个管道输入的答案,因此简单的写入,读取线周期不是一个选项.可执行文件实现了一些内部多线程,我希望输出一旦可用,就要附加读者线程.
作为测试可执行文件的示例,只需对每行进行随机播放(shuffleline.py):
#!/usr/bin/python -u
import sys
from random import shuffle
for line in sys.stdin:
line = line.strip()
# shuffle line
line = list(line)
shuffle(line)
line = "".join(line)
sys.stdout.write("%s\n"%(line))
sys.stdout.flush() # avoid buffers
Run Code Online (Sandbox Code Playgroud)
请注意,这已经尽可能没有缓冲.或者不是吗?这是我的精简测试程序:
#!/usr/bin/python -u
import sys
import Queue
import threading
import subprocess
class WriteThread(threading.Thread):
def __init__(self, p_in, source_queue):
threading.Thread.__init__(self)
self.pipe = p_in
self.source_queue = source_queue
def run(self):
while True:
source = self.source_queue.get()
print "writing to process: ", repr(source)
self.pipe.write(source)
self.pipe.flush()
self.source_queue.task_done()
class ReadThread(threading.Thread):
def __init__(self, p_out, target_queue):
threading.Thread.__init__(self)
self.pipe = p_out
self.target_queue = target_queue
def run(self):
while True:
line = self.pipe.readline() # blocking read
if line == '':
break
print "reader read: ", line.rstrip()
self.target_queue.put(line)
if __name__ == "__main__":
cmd = ["python", "-u", "./shuffleline.py"] # unbuffered
proc = subprocess.Popen(cmd, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
source_queue = Queue.Queue()
target_queue = Queue.Queue()
writer = WriteThread(proc.stdin, source_queue)
writer.setDaemon(True)
writer.start()
reader = ReadThread(proc.stdout, target_queue)
reader.setDaemon(True)
reader.start()
# populate queue
for i in range(10):
source_queue.put("string %s\n" %i)
source_queue.put("")
print "source_queue empty: ", source_queue.empty()
print "target_queue empty: ", target_queue.empty()
import time
time.sleep(2) # expect some output from reader thread
source_queue.join() # wait until all items in source_queue are processed
proc.stdin.close() # should end the subprocess
proc.wait()
Run Code Online (Sandbox Code Playgroud)
这给出了以下输出(python2.7):
writing to process: 'string 0\n'
writing to process: 'string 1\n'
writing to process: 'string 2\n'
writing to process: 'string 3\n'
writing to process: 'string 4\n'
writing to process: 'string 5\n'
writing to process: 'string 6\n'
source_queue empty: writing to process: 'string 7\n'
writing to process: 'string 8\n'
writing to process: 'string 9\n'
writing to process: ''
True
target_queue empty: True
Run Code Online (Sandbox Code Playgroud)
然后没有2秒......
reader read: rgsn0i t
reader read: nrg1sti
reader read: tis n2rg
reader read: snt gri3
reader read: nsri4 tg
reader read: stir5 gn
reader read: gnri6ts
reader read: ngrits7
reader read: 8nsrt ig
reader read: sg9 nitr
Run Code Online (Sandbox Code Playgroud)
预期开头的交错.但是,子进程的输出直到子进程结束后才会出现.随着更多的管道输入我得到一些输出,因此我假设stdout管道中的缓存问题.根据此处发布的其他问题,flushing stdout(在子进程中)应该可以工作,至少在Linux上是这样.
你的问题与subprocess模块或线程无关(有问题),甚至混合子进程和线程(一个非常糟糕的想法,甚至比使用线程开始更糟糕,除非你使用的是Python 3.2的后端)您可以从code.google.com/p/python-subprocess32获取的子进程模块,或者从多个线程访问相同的东西(就像您的print语句一样).
会发生什么是您的shuffleline.py程序缓冲.不是输出,而是输入.虽然不是很明显,但是当你遍历文件对象时,Python会读取块,通常是8k字节.由于sys.stdin是一个文件对象,你的for循环将缓冲直到EOF或一个完整的块:
for line in sys.stdin:
line = line.strip()
....
Run Code Online (Sandbox Code Playgroud)
如果你不想这样做,可以使用while循环来调用sys.stdin.readline()(返回''EOF):
while True:
line = sys.stdin.readline()
if not line:
break
line = line.strip()
...
Run Code Online (Sandbox Code Playgroud)
或者使用双参数形式iter(),它创建一个迭代器,调用第一个参数,直到返回第二个参数("sentinel"):
for line in iter(sys.stdin.readline, ''):
line = line.strip()
...
Run Code Online (Sandbox Code Playgroud)
如果我不建议不为此使用线程,而是在子twisted.reactor.spawnProcess进程的管道上使用非阻塞I/O,或者甚至有很多方法可以将进程和其他东西连接在一起作为消费者和生产者,我也会失职.
| 归档时间: |
|
| 查看次数: |
6723 次 |
| 最近记录: |