nos*_*klo 34 python subprocess ipc pipe blocking
我正在用python测试子进程管道.我知道我可以直接在python中执行下面的程序,但这不是重点.我只是想测试管道,所以我知道如何使用它.
我的系统是Linux Ubuntu 9.04,默认为python 2.6.
我从这个文档示例开始.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
这样的作品,但由于p1的stdin不被重定向,我在终端类型的东西喂管.当我输入^D关闭stdin时,我得到了我想要的输出.
但是,我想使用python字符串变量将数据发送到管道.首先我尝试写stdin:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
没工作.我尝试p2.stdout.read()在最后一行使用,但它也阻止.我补充说p1.stdin.flush(),p1.stdin.close()但它也没有用.我然后我开始沟通:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 
所以这仍然不是它.
我注意到运行一个进程(p1如上所述,删除p2)非常有效.将文件句柄传递给p1(stdin=open(...))也有效.所以问题是:
是否可以在python中将数据传递给2个或更多个子进程的管道,而不会阻塞?为什么不?
我知道我可以运行shell并在shell中运行管道,但这不是我想要的.
更新1:遵循Aaron Digulla的提示,我现在正在尝试使用线程来使其工作.
首先,我尝试在线程上运行p1.communicate.
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
好的,没用.尝试喜欢它更改为其他组合.write()也p2.read().没有.现在让我们尝试相反的方法:
def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()
代码最终阻塞某处.在生成的线程中,或在主线程中,或两者都有.所以它没有用.如果您知道如何使其工作,那么如果您可以提供工作代码会更容易.我在这里试试.
更新2
Paul Du Bois在下面回答了一些信息,所以我做了更多测试.我已经阅读了整个subprocess.py模块并了解了它的工作原理.所以我尝试将其完全应用于代码.
我在Linux上,但因为我是用线程测试,我的第一种方法是复制精确的Windows线程看到代码subprocess.py的communicate()方法,但对于两个过程,而不是一个.这是我尝试过的完整列表:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
好.它没用.即使在p1.stdin.close()被召唤之后,p2.stdout.read()仍然阻挡.
然后我尝试了posix代码subprocess.py:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)
    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []
print b
也阻止select.select().通过传播print,我发现了这个:
p1.stdin.numwrites,p1.stdin.close()被称为.select()开始阻止时,只有to_read一些东西,p2.stdout.to_write已经空了.os.read()call总是返回一些东西,所以p2.stdout.close()永远不会被调用.两个测试的结论:关闭stdin管道上的第一个进程(grep在示例中)并没有使它将缓冲的输出转储到下一个并且死掉.
没办法让它起作用?
PS:我不想使用临时文件,我已经测试了文件,我知道它有效.而且我不想使用Windows.
nos*_*klo 21
我发现了怎么做.
它不是关于线程,而是关于select().
当我运行第一个进程(grep)时,它会创建两个低级文件描述符,每个管道对应一个.让我们打电话给那些a和b.
当我运行第二个进程时,b传递给cut sdtin.但是有一个脑死亡的默认值Popen- close_fds=False.
这种影响cut也是继承的a.因此,grep即使我关闭也不会死a,因为stdin仍然在cut进程中打开(cut忽略它).
以下代码现在运行完美.
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"
close_fds=True应该是 unix系统的默认值.在Windows上它关闭所有 fds,因此它会阻止管道.
编辑:
PS:对于有类似问题的人来说,阅读这个答案:正如pooryorick在评论中所说,如果写入的数据p1.stdin大于缓冲区,也会阻止.在这种情况下,您应该将数据分成更小的部分,并使用select.select()知道何时读/写.问题中的代码应该提示如何实现它.
编辑2:找到另一个解决方案,在pooryorick的帮助下 - 而不是使用close_fds=True和关闭所有 fds,可以关闭fd属于第一个进程的s,当执行第二个进程时,它将起作用.关闭必须在孩子preexec_fn身上完成,所以Popen 的功能非常方便.执行p2时,您可以执行以下操作:
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
在Python中处理大型文件时,需要统一应用两个原则.
EOF在开始取得进展之前等待.另一种方法是使用非阻塞IO,尽管这在标准Python中很麻烦.有关使用非阻塞原语实现同步IO API的轻量级线程库,请参阅gevent.
我们将构建一个粗略的愚蠢管道
{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}
大括号中的每个阶段{}都是用Python实现的,而其他阶段则使用标准的外部程序.TL; DR: 看到这个要点.
我们从预期的进口开始.
#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading
除了最后一个Python实现的管道阶段之外的所有阶段都需要进入一个线程,这样它的IO就不会阻塞其他的.如果你想让它们实际并行运行(避免使用GIL),这些可以在Python子进程中运行.
def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()
每个都需要放在自己的线程中,我们将使用这个便利功能.
def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t
使用Popen和使用Python阶段创建外部阶段spawn.参数bufsize=-1说使用系统默认缓冲(通常为4 kiB).这通常比默认(无缓冲)或行缓冲更快,但如果您希望直观地监视输出而没有滞后,则需要行缓冲.
grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)
twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)
如上所述,管道中的所有缓冲区都将填满,但由于没有人从end(grepz.stdout)读取,所以它们都将被阻塞.我们可以在一次调用中读取整个内容grepz.stdout.read(),但是这会为大文件使用大量内存.相反,我们逐步阅读.
for line in grepz.stdout:
    sys.stdout.write(line.lower())
线程和进程一旦到达就会清理干净EOF.我们可以明确地清理使用
for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()
在内部,subprocess.Popen调用fork,配置管道文件描述符和调用exec.子进程来自fork父进程中的所有文件描述符的副本,并且需要在相应的阅读器获取之前关闭这两个副本EOF.这可以通过手动关闭管道(通过close_fds=True或适当的preexec_fn参数subprocess.Popen)或通过将FD_CLOEXEC标志设置为exec自动关闭文件描述符来解决.此标志在Python-2.7及更高版本中自动设置,请参见issue12786.我们可以通过调用在早期版本的Python中获得Python-2.7行为
p._set_cloexec_flags(p.stdin)
在p.stdin作为参数传递给后续的之前subprocess.Popen.
| 归档时间: | 
 | 
| 查看次数: | 21388 次 | 
| 最近记录: |