如何加快与子进程的通信

Ale*_*lds 9 python multithreading subprocess python-multithreading

我使用Python 2 subprocessthreading螺纹采取标准输入,处理它与二进制文件A,B以及C和写入修改的数据标准输出.

这个脚本(让我们称之为:) A_to_C.py非常慢,我想学习如何解决它.

一般流程如下:

A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))

B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))

C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin)) 

produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()

produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()

A_process.wait()
B_process.wait()
C_process.wait()
Run Code Online (Sandbox Code Playgroud)

这个想法是标准输入进入A_to_C.py:

  1. A二进制处理标准输入的块,并创建A与该功能-output produceA.
  2. B二进制过程的块A的标准输出并产生B通过功能-output produceB.
  3. C二进制过程的块B通过功能的标准输出produceC并写入C-output标准输出.

我使用cProfile进行了分析,这个脚本中几乎所有的时间似乎都花在获取线程锁上.

例如,在测试417s作业中,416s(总运行时间的99%)用于获取线程锁:

$ python                                                                                                                                                                                                                                         
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)                                                                                                                                                                                                                                              
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2                                                                                                                                                                                                                                              
Type "help", "copyright", "credits" or "license" for more information.                                                                                                                                                                                                                        
>>> import pstats                                                                                                                                                                                                                                                                             
>>> p = pstats.Stats('1.profile')                                                                                                                                                                                                                                                             
>>> p.sort_stats('cumulative').print_stats(10)                                                                                                                                                                                                                                                
Thu Jun 12 22:19:07 2014    1.profile                                                                                                                                                                                                                                                         

         1755 function calls (1752 primitive calls) in 417.203 CPU seconds                                                                                                                                                                                                                    

   Ordered by: cumulative time                                                                                                                                                                                                                                                                
   List reduced from 162 to 10 due to restriction <10>                                                                                                                                                                                                                                        

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)                                                                                                                                                                                                                       
        1    0.020    0.020  417.203  417.203 A_to_C.py:90(<module>)                                                                                                                                                                                  
        1    0.000    0.000  417.123  417.123 A_to_C.py:809(main)                                                                                                                                                                                     
        6    0.000    0.000  416.424   69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)                                                                                                                                                                         
       32  416.424   13.013  416.424   13.013 {method 'acquire' of 'thread.lock' objects}                                                                                                                                                                                                     
        3    0.000    0.000  416.422  138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)                                                                                                                                                                         
        3    0.000    0.000    0.498    0.166 A_to_C.py:473(which)                                                                                                                                                                                    
       37    0.000    0.000    0.498    0.013 A_to_C.py:475(is_exe)                                                                                                                                                                                   
        3    0.496    0.165    0.496    0.165 {posix.access}                                                                                                                                                                                                                                  
        6    0.000    0.000    0.194    0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)                                                                                                                                                           
        3    0.000    0.000    0.191    0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)
Run Code Online (Sandbox Code Playgroud)

我的错误threading.Thread和/或subprocess.Popen安排导致了这个问题我做错了什么?

dan*_*ano 10

我认为你只是被cProfile的工作方式误导了.例如,这是一个使用两个线程的简单脚本:

#!/usr/bin/python

import threading
import time

def f():
    time.sleep(10)


def main():
    t = threading.Thread(target=f)
    t.start()
    t.join()
Run Code Online (Sandbox Code Playgroud)

如果我使用cProfile测试这个,这是我得到的:

>>> import test
>>> import cProfile
>>> cProfile.run('test.main()')
         60 function calls in 10.011 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   10.011   10.011 <string>:1(<module>)
        1    0.000    0.000   10.011   10.011 test.py:10(main)
        1    0.000    0.000    0.000    0.000 threading.py:1008(daemon)
        2    0.000    0.000    0.000    0.000 threading.py:1152(currentThread)
        2    0.000    0.000    0.000    0.000 threading.py:241(Condition)
        2    0.000    0.000    0.000    0.000 threading.py:259(__init__)
        2    0.000    0.000    0.000    0.000 threading.py:293(_release_save)
        2    0.000    0.000    0.000    0.000 threading.py:296(_acquire_restore)
        2    0.000    0.000    0.000    0.000 threading.py:299(_is_owned)
        2    0.000    0.000   10.011    5.005 threading.py:308(wait)
        1    0.000    0.000    0.000    0.000 threading.py:541(Event)
        1    0.000    0.000    0.000    0.000 threading.py:560(__init__)
        2    0.000    0.000    0.000    0.000 threading.py:569(isSet)
        4    0.000    0.000    0.000    0.000 threading.py:58(__init__)
        1    0.000    0.000    0.000    0.000 threading.py:602(wait)
        1    0.000    0.000    0.000    0.000 threading.py:627(_newname)
        5    0.000    0.000    0.000    0.000 threading.py:63(_note)
        1    0.000    0.000    0.000    0.000 threading.py:656(__init__)
        1    0.000    0.000    0.000    0.000 threading.py:709(_set_daemon)
        1    0.000    0.000    0.000    0.000 threading.py:726(start)
        1    0.000    0.000   10.010   10.010 threading.py:911(join)
       10   10.010    1.001   10.010    1.001 {method 'acquire' of 'thread.lock' objects}
        2    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        4    0.000    0.000    0.000    0.000 {method 'release' of 'thread.lock' objects}
        4    0.000    0.000    0.000    0.000 {thread.allocate_lock}
        2    0.000    0.000    0.000    0.000 {thread.get_ident}
        1    0.000    0.000    0.000    0.000 {thread.start_new_thread}
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,它说几乎所有的时间都花在获取锁上.当然,我们知道这并不能准确反映脚本的作用.所有的时间实际上都是在time.sleep里面打电话f().高tottime的的acquire通话仅仅是因为join在等待f完成,这意味着它必须坐在那里,等待获得锁.但是,cProfile根本没有显示任何时间f.我们可以清楚地看到实际发生的事情,因为示例代码非常简单,但在更复杂的程序中,这个输出非常容易引起误解.

您可以使用另一个分析库(如yappi)获得更可靠的结果:

>>> import test
>>> import yappi
>>> yappi.set_clock_type("wall")
>>> yappi.start()
>>> test.main()
>>> yappi.get_func_stats().print_all()

Clock type: wall
Ordered by: totaltime, desc

name                                    #n         tsub      ttot      tavg
<stdin>:1 <module>                      2/1        0.000025  10.00801  5.004003
test.py:10 main                         1          0.000060  10.00798  10.00798
..2.7/threading.py:308 _Condition.wait  2          0.000188  10.00746  5.003731
..thon2.7/threading.py:911 Thread.join  1          0.000039  10.00706  10.00706
..ython2.7/threading.py:752 Thread.run  1          0.000024  10.00682  10.00682
test.py:6 f                             1          0.000013  10.00680  10.00680
..hon2.7/threading.py:726 Thread.start  1          0.000045  0.000608  0.000608
..thon2.7/threading.py:602 _Event.wait  1          0.000029  0.000484  0.000484
..2.7/threading.py:656 Thread.__init__  1          0.000064  0.000250  0.000250
..on2.7/threading.py:866 Thread.__stop  1          0.000025  0.000121  0.000121
..lib/python2.7/threading.py:541 Event  1          0.000011  0.000101  0.000101
..python2.7/threading.py:241 Condition  2          0.000025  0.000094  0.000047
..hreading.py:399 _Condition.notifyAll  1          0.000020  0.000090  0.000090
..2.7/threading.py:560 _Event.__init__  1          0.000018  0.000090  0.000090
..thon2.7/encodings/utf_8.py:15 decode  2          0.000031  0.000071  0.000035
..threading.py:259 _Condition.__init__  2          0.000064  0.000069  0.000034
..7/threading.py:372 _Condition.notify  1          0.000034  0.000068  0.000068
..hreading.py:299 _Condition._is_owned  3          0.000017  0.000040  0.000013
../threading.py:709 Thread._set_daemon  1          0.000018  0.000035  0.000035
..ding.py:293 _Condition._release_save  2          0.000019  0.000033  0.000016
..thon2.7/threading.py:63 Thread._note  7          0.000020  0.000020  0.000003
..n2.7/threading.py:1152 currentThread  2          0.000015  0.000019  0.000009
..g.py:296 _Condition._acquire_restore  2          0.000011  0.000017  0.000008
../python2.7/threading.py:627 _newname  1          0.000014  0.000014  0.000014
..n2.7/threading.py:58 Thread.__init__  4          0.000013  0.000013  0.000003
..threading.py:1008 _MainThread.daemon  1          0.000004  0.000004  0.000004
..hon2.7/threading.py:569 _Event.isSet  2          0.000003  0.000003  0.000002
Run Code Online (Sandbox Code Playgroud)

有了yappi,更容易看到时间花在了f.

我怀疑你会发现,在现实中,大多数的脚本的时间都花在做力所能及的工作正在做的produceA,produceBproduceC.


Ale*_*x I 5

TL; DR如果您的程序运行速度低于预期,可能是由于中间函数的功能而不是IPC或线程的细节.使用模拟函数和进程(尽可能简单)进行测试,以仅隔离向/从子进程传递数据的开销.在基于您的代码的基准测试中(下面),将数据传递到子进程或从子进程传递数据时的性能似乎大致相当于直接使用shell管道; python在这个任务上并不是特别慢.

原始代码发生了什么

原始代码的一般形式是:

def produceB(from_stream, to_stream):
    while True:
        buf = from_stream.read()
        processed_buf = do_expensive_calculation(buf)
        to_stream.write(processed_buf)
Run Code Online (Sandbox Code Playgroud)

这里读取和写入之间的计算大约占所有进程(主要和次要)组合的总CPU时间的2/3 - 这是cpu时间,而不是挂钟时间btw.

我认为这可以防止I/O全速运行.读取和写入以及计算都需要有自己的线程,队列在读取和计算之间以及计算和写入之间提供缓冲(因为管道提供的缓冲量不足,我相信).

我在下面说明如果在读取和写入之间没有处理(或者等效地:如果中间处理在单独的线程中完成),则线程+子进程的吞吐量非常高.也可以为读写提供单独的线程; 这增加了一些开销,但使写入不是块读取,反之亦然.三个线程(读取,写入和处理)甚至更好,然后两个步骤都不会阻塞其他线程(当然,在队列大小的限制范围内).

一些基准

下面的所有基准测试都是在Ubuntu 14.04LTS 64bit(Intel i7,Ivy Bridge,四核)上的python 2.7.6上.测试是在两个dd进程之间以4KB块传输大约1GB的数据,并通过python作为中介传递数据.dd进程使用中等大小(4KB)块; 典型的文本I/O会更小(除非它被解释器巧妙地缓冲等),典型的二进制I/O当然要大得多.我有一个基于你如何做到这一点的例子,以及一个基于我前一段时间尝试的替代方法的例子(结果证明速度较慢).顺便说一句,感谢发布这个问题,它很有用.

线程和阻塞I/O.

首先,让我们将问题中的原始代码转换为稍微简单的自包含示例.这只是与一个线程进行通信的两个进程,该线程将数据从一个泵送到另一个,执行阻塞读取和写入.

import subprocess, threading

A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)

def convert_A_to_B(src, dst):
    read_size = 8*1024
    while True:
        try:
            buf = src.read(read_size)
            if len(buf) == 0:  # This is a bit hacky, but seems to reliably happen when the src is closed
                break
            dst.write(buf)
        except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError
            print str(e)
            break

convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin))

convert_A_to_B_thread.start()

# Here, watch out for the exact sequence to clean things up
convert_A_to_B_thread.join()

A_process.wait()
B_process.stdin.close()
B_process.wait()
Run Code Online (Sandbox Code Playgroud)

结果:

244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s

real    0m0.678s
user    0m0.657s
sys 0m1.273s
Run Code Online (Sandbox Code Playgroud)

不错!事实证明,在这种情况下,理想的读取大小约为8k-16KB,小得多,而更大的尺寸稍微慢一些.这可能与我们要求dd使用的4KB块大小有关.

选择和非阻塞I/O.

当我之前看到这种类型的问题时,我朝着使用select(),非阻塞I/O和单个线程的方向前进.这方面的一个例子是我的问题:如何异步读取和写入子进程?.这是为了从两个并行的过程中读取,我将其扩展到读取一个过程并写入另一个过程.非阻塞写入的大小限制为PIPE_BUF或更小,在我的系统上为4KB; 为简单起见,读取也是4KB,尽管它们可以是任何大小.这有几个奇怪的角落情况(和莫名其妙的挂起,取决于细节)但在下面的表格中它可靠地工作.

import subprocess, select, fcntl, os, sys

p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)

def make_nonblocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

make_nonblocking(p1.stdout)
make_nonblocking(p2.stdin)

print "PIPE_BUF = %d" % (select.PIPE_BUF)

read_size = select.PIPE_BUF
max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1
bufs = []

while True:
    inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[]) 

    for fd in inputready: 
        if fd == p1.stdout.fileno():
            if len(bufs) < max_buf_len:
                data = p1.stdout.read(read_size)
                bufs.append(data)
    for fd in outputready: 
        if fd == p2.stdin.fileno() and len(bufs) > 0:
            data = bufs.pop(0)
            p2.stdin.write(data)

    p1.poll()
    # If the first process is done and there is nothing more to write out
    if p1.returncode != None and len(bufs) == 0:
        # Again cleanup is tricky.  We expect the second process to finish soon after its input is closed
        p2.stdin.close()
        p2.wait()
        p1.wait()
        break
Run Code Online (Sandbox Code Playgroud)

结果:

PIPE_BUF = 4096
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s
244133+0 records in
244133+0 records out
999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s

real    0m3.167s
user    0m2.719s
sys 0m2.373s
Run Code Online (Sandbox Code Playgroud)

然而,这比上面的版本要慢得多(即使两者的读/写大小都是4KB,对于苹果对苹果的比较).我不知道为什么.

PS延迟添加:似乎可以忽略或超过PIPE_BUF.这会导致从p2.stdin.write()(errno = 11,暂时不可用)大部分时间抛出IOError异常,大概是当管道中有足够的空间来写东西时,但是小于我们请求的完整大小.上面的相同代码read_size = 64*1024,以及捕获和忽略的异常,以1.4GB/s的速度运行.

直接管道

就像一个基线一样,使用shell版本的管道(在子进程中)运行它的速度有多快?我们来看一下:

import subprocess
subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True)
Run Code Online (Sandbox Code Playgroud)

结果:

244140+0 records in
244140+0 records out
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s
999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s

real    0m0.466s
user    0m0.300s
sys 0m0.590s
Run Code Online (Sandbox Code Playgroud)

这明显比线程python示例快.但是,这只是一个副本,而线程python版本正在做两个(进入和退出python).修改命令以"dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k"使性能达到1.6GB,与python示例一致.

如何在完整的系统中运行比较

关于如何在完整系统中进行比较的一些额外想法.同样为了简单起见,这只是两个过程,并且两个脚本具有完全相同的convert_A_to_B()功能.

脚本1:在python中传递数据,如上所述

A_process = subprocess.Popen(["A", ...
B_process = subprocess.Popen(["B", ...
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ...
Run Code Online (Sandbox Code Playgroud)

脚本2:比较脚本,在shell中传递数据

convert_A_to_B(sys.stdin, sys.stdout)
Run Code Online (Sandbox Code Playgroud)

在shell中运行它: A | python script_2.py | B

这允许在完整系统中进行苹果对苹果的比较,而无需使用模拟函数/过程.

块读取大小如何影响结果

对于此测试,使用上面第一个(线程)示例中的代码,并且两者dd和python脚本都设置为使用相同的块大小读/写.

| Block size | Throughput |
|------------|------------|
| 1KB | 249MB/s |
| 2KB | 416MB/s |
| 4KB | 552MB/s |
| 8KB | 1.4GB/s |
| 16KB | 1.8GB/s |
| 32KB | 2.9GB/s |
| 64KB | 3.0GB/s |
| 128KB | 1.0GB/s |
| 256KB | 600MB/s |
Run Code Online (Sandbox Code Playgroud)

从理论上讲,缓冲区应该有更好的性能(可能高达缓存效果)但实际上Linux管道使用非常大的缓冲区会减慢,即使使用纯shell管道也是如此.


Ole*_*leg 1

对 subprocess.Popen() 的调用隐式指定 bufsize 的默认值 0,这会强制使用无缓冲 I/O。尝试添加合理的缓冲区大小(4K、16K,甚至 1M),看看是否有任何区别。