blocks - 将输入发送到python子进程管道

nos*_*klo 34 python subprocess ipc pipe blocking

我正在用python测试子进程管道.我知道我可以直接在python中执行下面的程序,但这不是重点.我只是想测试管道,所以我知道如何使用它.

我的系统是Linux Ubuntu 9.04,默认为python 2.6.

我从这个文档示例开始.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
Run Code Online (Sandbox Code Playgroud)

这样的作品,但由于p1stdin不被重定向,我在终端类型的东西喂管.当我输入^D关闭stdin时,我得到了我想要的输出.

但是,我想使用python字符串变量将数据发送到管道.首先我尝试写stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
Run Code Online (Sandbox Code Playgroud)

没工作.我尝试p2.stdout.read()在最后一行使用,但它也阻止.我补充说p1.stdin.flush(),p1.stdin.close()但它也没有用.我然后我开始沟通:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 
Run Code Online (Sandbox Code Playgroud)

所以这仍然不是它.

我注意到运行一个进程(p1如上所述,删除p2)非常有效.将文件句柄传递给p1(stdin=open(...))也有效.所以问题是:

是否可以在python中将数据传递给2个或更多个子进程的管道,而不会阻塞?为什么不?

我知道我可以运行shell并在shell中运行管道,但这不是我想要的.


更新1:遵循Aaron Digulla的提示,我现在正在尝试使用线程来使其工作.

首先,我尝试在线程上运行p1.communicate.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
Run Code Online (Sandbox Code Playgroud)

好的,没用.尝试喜欢它更改为其他组合.write()p2.read().没有.现在让我们尝试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()
Run Code Online (Sandbox Code Playgroud)

代码最终阻塞某处.在生成的线程中,或在主线程中,或两者都有.所以它没有用.如果您知道如何使其工作,那么如果您可以提供工作代码会更容易.我在这里试试.


更新2

Paul Du Bois在下面回答了一些信息,所以我做了更多测试.我已经阅读了整个subprocess.py模块并了解了它的工作原理.所以我尝试将其完全应用于代码.

我在Linux上,但因为我是用线程测试,我的第一种方法是复制精确的Windows线程看到代码subprocess.pycommunicate()方法,但对于两个过程,而不是一个.这是我尝试过的完整列表:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
Run Code Online (Sandbox Code Playgroud)

好.它没用.即使在p1.stdin.close()被召唤之后,p2.stdout.read()仍然阻挡.

然后我尝试了posix代码subprocess.py:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b
Run Code Online (Sandbox Code Playgroud)

也阻止select.select().通过传播print,我发现了这个:

  • 阅读工作正常.代码在执行期间多次读取.
  • 写作也有效.数据被写入p1.stdin.
  • 最后numwrites,p1.stdin.close()被称为.
  • select()开始阻止时,只有to_read一些东西,p2.stdout.to_write已经空了.
  • os.read()call总是返回一些东西,所以p2.stdout.close()永远不会被调用.

两个测试的结论:关闭stdin管道上的第一个进程(grep在示例中)并没有使它将缓冲的输出转储到下一个并且死掉.

没办法让它起作用?

PS:我不想使用临时文件,我已经测试了文件,我知道它有效.而且我不想使用Windows.

nos*_*klo 21

我发现了怎么做.

它不是关于线程,而是关于select().

当我运行第一个进程(grep)时,它会创建两个低级文件描述符,每个管道对应一个.让我们打电话给那些ab.

当我运行第二个进程时,b传递给cut sdtin.但是有一个脑死亡的默认值Popen- close_fds=False.

这种影响cut也是继承的a.因此,grep即使我关闭也不会死a,因为stdin仍然在cut进程中打开(cut忽略它).

以下代码现在运行完美.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"
Run Code Online (Sandbox Code Playgroud)

close_fds=True应该是 unix系统的默认值.在Windows上它关闭所有 fds,因此它会阻止管道.

编辑:

PS:对于有类似问题的人来说,阅读这个答案:正如pooryorick在评论中所说,如果写入的数据p1.stdin大于缓冲区,也会阻止.在这种情况下,您应该将数据分成更小的部分,并使用select.select()知道何时读/写.问题中的代码应该提示如何实现它.

编辑2:找到另一个解决方案,在pooryorick的帮助下 - 而不是使用close_fds=True和关闭所有 fds,可以关闭fd属于第一个进程的s,当执行第二个进程时,它将起作用.关闭必须在孩子preexec_fn身上完成,所以Popen 的功能非常方便.执行p2时,您可以执行以下操作:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
Run Code Online (Sandbox Code Playgroud)

  • close_fds = True是python 3.3中的默认值,但不是2.7 (2认同)

Jed*_*Jed 6

使用大文件

在Python中处理大型文件时,需要统一应用两个原则.

  1. 由于任何IO例程都可以阻塞,我们必须将管道的每个阶段保持在不同的线程或进程中.我们在这个例子中使用线程,但子进程可以让你避免使用GIL.
  2. 我们必须使用增量读写,这样我们才能EOF在开始取得进展之前等待.

另一种方法是使用非阻塞IO,尽管这在标准Python中很麻烦.有关使用非阻塞原语实现同步IO API的轻量级线程库,请参阅gevent.

示例代码

我们将构建一个粗略的愚蠢管道

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}
Run Code Online (Sandbox Code Playgroud)

大括号中的每个阶段{}都是用Python实现的,而其他阶段则使用标准的外部程序.TL; DR: 看到这个要点.

我们从预期的进口开始.

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading
Run Code Online (Sandbox Code Playgroud)

管道的Python阶段

除了最后一个Python实现的管道阶段之外的所有阶段都需要进入一个线程,这样它的IO就不会阻塞其他的.如果你想让它们实际并行运行(避免使用GIL),这些可以在Python子进程中运行.

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()
Run Code Online (Sandbox Code Playgroud)

每个都需要放在自己的线程中,我们将使用这个便利功能.

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t
Run Code Online (Sandbox Code Playgroud)

创建管道

使用Popen和使用Python阶段创建外部阶段spawn.参数bufsize=-1说使用系统默认缓冲(通常为4 kiB).这通常比默认(无缓冲)或行缓冲更快,但如果您希望直观地监视输出而没有滞后,则需要行缓冲.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)
Run Code Online (Sandbox Code Playgroud)

驱动管道

如上所述,管道中的所有缓冲区都将填满,但由于没有人从end(grepz.stdout)读取,所以它们都将被阻塞.我们可以在一次调用中读取整个内容grepz.stdout.read(),但是这会为大文件使用大量内存.相反,我们逐步阅读.

for line in grepz.stdout:
    sys.stdout.write(line.lower())
Run Code Online (Sandbox Code Playgroud)

线程和进程一旦到达就会清理干净EOF.我们可以明确地清理使用

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()
Run Code Online (Sandbox Code Playgroud)

Python-2.6及更早版本

在内部,subprocess.Popen调用fork,配置管道文件描述符和调用exec.子进程来自fork父进程中的所有文件描述符的副本,并且需要在相应的阅读器获取之前关闭这两个副本EOF.这可以通过手动关闭管道(通过close_fds=True或适当的preexec_fn参数subprocess.Popen)或通过将FD_CLOEXEC标志设置为exec自动关闭文件描述符来解决.此标志在Python-2.7及更高版本中自动设置,请参见issue12786.我们可以通过调用在早期版本的Python中获得Python-2.7行为

p._set_cloexec_flags(p.stdin)
Run Code Online (Sandbox Code Playgroud)

p.stdin作为参数传递给后续的之前subprocess.Popen.