python concurrent.futures.ProcessPoolExecutor:.submit()vs .map()的性能

Sun*_*ear 13 python concurrency performance python-3.x concurrent.futures

我使用concurrent.futures.ProcessPoolExecutor来查找数字范围内的数字的出现.目的是调查从并发中获得的加速性能的数量.为了测试性能,我有一个控件 - 一个执行所述任务的串行代码(如下所示).我编写了2个并发代码,一个使用concurrent.futures.ProcessPoolExecutor,另一个concurrent.futures.ProcessPoolExecutor.submit()用于执行相同的任务.它们如下所示.关于起草前者和后者的建议可分别在这里这里看到.

发给所有三个代码的任务是在0到1E8的数字范围内找到数字5的出现次数.无论concurrent.futures.ProcessPoolExecutor.map().submit()被指派6名工人,并.map()有10000 CHUNKSIZE.在并发代码中,分离工作负载的方式是相同的.但是,用于在两个代码中查找出现的函数是不同的.这是因为参数传递给.submit()和.map()调用的函数的方式不同.

所有3个代码报告的发生次数相同,即56,953,279次.但是,完成任务所需的时间非常不同..map()执行速度比控制快2倍,同时控制时间.submit()是控制完成任务的两倍.

问题:

  1. 我想知道.map()我的编码是否是一个神器,或者它本身就很慢?"如果是前者,我怎么能改进它.我只是惊讶它表现得比控制慢,因为没有多少激励使用它.
  2. 我想知道是否还有.submit()更快的代码执行.我有一个条件是函数.map()必须返回一个包含数字5的数字/出现次数的iterable.

基准测试结果
基准结果

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))
Run Code Online (Sandbox Code Playgroud)

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = {0}'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))
Run Code Online (Sandbox Code Playgroud)

串行码:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('\n main')
    print("found {0} in {1:.4f}sec".format(len(a),end))
Run Code Online (Sandbox Code Playgroud)

2017年2月13日更新:

除了@niemmi的答案,我还提供了一些个人研究后的答案:

  1. 如何进一步加速@niemmi的.map()和.submit()解决方案,以及
  2. 当ProcessPoolExecutor.map()可以导致比ProcessPoolExecutor.submit()更快的速度.

nie*_*mmi 8

你在这里比较苹果和橙子。使用时,map您生成所有1E8数字并将它们传输到工作进程。与实际执行相比,这需要很多时间。使用时,submit您只需创建 6 组可传输的参数。

如果您更改map为以相同的原则操作,您将获得彼此接近的数字:

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunk * i for i in range(workers))
        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))

        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found
Run Code Online (Sandbox Code Playgroud)

您可以通过as_completed正确使用来提高提交的性能。对于给定的期货迭代,它将返回一个迭代器,该迭代器将yield按照它们完成的顺序进行期货。

您还可以跳过将数据复制到另一个数组,并使用itertools.chain.from_iterable将期货的结果组合到单个可迭代对象:

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))

    return chain.from_iterable(f.result() for f in cf.as_completed(futures))

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))
Run Code Online (Sandbox Code Playgroud)


Sun*_*ear 7

概述:

我的回答分为两部分:

  • 第1部分展示了如何从@ niemmi的ProcessPoolExecutor.map()解决方案中获得更多的加速.
  • 第2部分显示了何时ProcessPoolExecutor的子类.submit().map()产生非等效的计算时间.

================================================== =====================

第1部分:ProcessPoolExecutor.map()的更多加速

背景: 本节以@niemmi的.map()解决方案为基础,该解决方案本身就非常出色.在对他的离散化方案进行一些研究以更好地理解如何与.map()chunksizes争论进行交互时,我发现了这个有趣的解决方案.

我认为@niemmi的定义是chunk = nmax // workerschunksize的定义,即工作池中每个工作人员要处理的实际数字范围(给定任务)的较小大小.现在,这个定义的前提是假设一台计算机有x个工人,在每个工人之间平均分配任务将导致每个工人的最佳使用,因此总任务将最快完成.因此,分解给定任务的块数应始终等于池工作者的数量.但是,这个假设是否正确?

命题:在这里,我提出上述假设并不总是导致计算时间最快ProcessPoolExecutor.map().相反,将任务分离到大于池工作者数量的量可以导致加速,即更快地完成给定任务.

实验:我修改了@ niemmi的代码,允许离散任务的数量超过池工作者的数量.下面给出了该代码,用于表示数字5出现在0到1E8的数字范围内的次数.我已经使用1,2,4和6个池工作者以及离散任务数量与池工作者数量的不同比率执行此代码.对于每个方案,进行了3​​次运行并将计算时间制成表格." 加速 "在此定义为使用相等数量的块和池工作者的平均计算时间,而不是离散任务数大于池工作者数的平均计算时间.

发现:

对nworkers来说很重要

  1. 左图显示了实验部分中提到的所有方案所花费的计算时间.它表明,通过采取计算时间块数/工人= 1的数 总是比所采取的计算时间更大的块数>工人数.也就是说,前一种情况总是效率低于后者.

  2. 右图显示,块数/工人数达到阈值14或更高时,获得了1.2倍或更多的加速.有趣的是,观察到加速趋势也发生在ProcessPoolExecutor.map()1名工人执行时.

结论:当自定义ProcessPoolExecutor.map()`用于解决给定任务的离散任务的数量时,谨慎的做法是确保此数字大于池工作者的数量,因为这种做法缩短了计算时间.

concurrent.futures.ProcessPoolExecutor.map()代码.(仅限修订部分)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('\n within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))
Run Code Online (Sandbox Code Playgroud)

================================================== =====================

第2部分:使用ProcessPoolExecutor子类时的总计算时间.submit()和.map()在返回排序/排序结果列表时可能不同.

背景:我修改了两个.submit().map()代码,允许对他们的计算时间进行"苹果到苹果"的比较,以及可视化主代码的计算时间的能力,主代码调用的_concurrent方法的计算时间.执行并发操作,以及_concurrent方法调用的每个离散化任务/工作者的计算时间.此外,这些代码中的并发方法被构造为直接从未来对象.submit()和迭代器返回结果的无序和有序列表 .map().源代码如下(希望它可以帮助您.).

实验这两个新改进的代码用于执行第1部分中描述的相同实验,除了只考虑6个池工作者并且使用python内置listsorted方法将结果的无序和有序列表返回到主要部分代码分别.

发现: .submit vs .map plus list vs sorted

  1. 从_concurrent方法的结果中,我们可以看到用于创建所有Future对象的_concurrent方法的计算时间ProcessPoolExecutor.submit(),并创建迭代器ProcessPoolExecutor.map(),作为离散任务数量与池工作者数量的函数,是等效的.这个结果只是意味着ProcessPoolExecutor子类.submit()并且.map()同样有效/快速.
  2. 比较main和它的_concurrent方法的计算时间,我们可以看到main比它的_concurrent方法运行得更长.这是预期的,因为它们的时间差反映了listsorted方法的计算时间量(以及这些方法中包含的其他方法的计算时间).很明显,该list方法返回结果列表所花费的计算时间少于sorted方法.list.submit()和.map()代码的方法的平均计算时间相似,约为0.47秒..submit()和.map()代码的排序方法的平均计算时间分别为1.23秒和1.01秒.换句话说,该list方法分别比sorted.submit()和.map()代码的方法执行快2.62倍和2.15倍.
  3. 目前尚不清楚为什么该sorted方法生成一个有序列表的 .map()速度要快于.submit(),因为离散化任务的数量增加的数量超过了池工作者的数量,除非离散任务的数量等于池工作者的数量.也就是说,这些研究结果表明,使用同样快速.submit().map()子类的决定可能会受到排序方法的阻碍.例如,如果意图是在尽可能短的时间内生成有序列表,则应优先使用ProcessPoolExecutor.map(),ProcessPoolExecutor.submit()因为.map()可以允许最短的总计算时间.
  4. 我的答案第1部分中提到的离散化方案在这里显示,以加快子类.submit().map()子类的性能.与离散任务的数量等于池工人数量的情况相比,加速量可高达20%.

改进了.map()代码

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found {0} in {1:.4f}sec".format(len(found),end))    
Run Code Online (Sandbox Code Playgroud)

改进了.submit()代码.
除了用以下代码替换_concurrent方法之外,此代码与.map代码相同:

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list
Run Code Online (Sandbox Code Playgroud)

================================================== =====================