在函数中执行循环多重处理的最快方法?

Gun*_*Gun 4 python parallel-processing multithreading multiprocessing python-asyncio

1.我有一个函数var。我想知道通过利用系统拥有的所有处理器、内核、线程和 RAM 内存进行多处理/并行处理来快速运行此函数中的循环的最佳方法。

import numpy
from pysheds.grid import Grid

xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'

def var(interest):
    
    variable_avg = []
    for (x,y) in zip(xs,ys):
        grid = Grid.from_raster(interest, data_name='map')

        grid.catchment(data='map', x=x, y=y, out_name='catch')

        variable = grid.view('catch', nodata=np.nan)
        variable = numpy.array(variable)
        variablemean = (variable).mean()
        variable_avg.append(variablemean)
    return(variable_avg)

Run Code Online (Sandbox Code Playgroud)

2.var如果我可以针对给定的函数多个参数并行运行函数和循环,那就太好了。var(a)例如:var(b)同时。因为它比单独并行化循环消耗的时间要少得多。

如果没有意义,请忽略 2。

Mat*_*ler 5

TLDR: 您可以使用多处理库并行运行您的var函数。但是,正如所写,由于其开销,您可能没有var对多处理进行足够的调用来获得性能优势。如果您所需要做的就是运行这两个调用,那么串行运行可能是您获得的最快速度。但是,如果您需要进行大量调用,多重处理可以帮助您。

我们需要使用进程池来并行运行它,线程在这里不起作用,因为 Python 的全局解释器锁将阻止我们实现真正的并行性。进程池的缺点是进程的启动速度很慢。在仅运行两次调用的示例中,var创建池的时间超过了运行var本身所花费的时间。

为了说明这一点,让我们使用进程池并使用 asyncio 并行运行调用var,并将其与仅顺序运行事物进行比较。请注意,为了运行此示例,我使用了 Pysheds 库https://github.com/mdbartos/pysheds/tree/master/data中的图像- 如果您的图像大得多,则以下内容可能不成立。

import functools
import time
from concurrent.futures.process import ProcessPoolExecutor
import asyncio

a = 'diem.tif'
xs = 10, 20, 30, 40, 50
ys = 10, 20, 30, 40, 50

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    with ProcessPoolExecutor() as pool:
        task_one = loop.run_in_executor(pool, functools.partial(var, a))
        task_two = loop.run_in_executor(pool, functools.partial(var, a))
        results = await asyncio.gather(task_one, task_two)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time()

    result_one = var(a)
    result_two = var(a)

    serial_end = time.time()
    print(f'Running in serial took {serial_end - serial_start}')


if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

在我的机器(2.4 GHz 8 核 Intel Core i9)上运行上述命令,我得到以下输出:

Process pool took 1.7581260204315186
Running in serial took 0.32335805892944336
Run Code Online (Sandbox Code Playgroud)

在此示例中,进程池的速度慢了五倍多!这是由于创建和管理多个进程的开销造成的。也就是说,如果您需要调用var多次,那么进程池可能更有意义。让我们对其进行调整以运行var100 次并比较结果:

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(loop.run_in_executor(pool, functools.partial(var, a)))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time()

    for _ in range(100):
        result = var(a)

    serial_end = time.time()
    print(f'Running in serial took {serial_end - serial_start}')
Run Code Online (Sandbox Code Playgroud)

运行 100 次,我得到以下输出:

Process pool took 3.442288875579834
Running in serial took 13.769982099533081
Run Code Online (Sandbox Code Playgroud)

在这种情况下,在进程池中运行大约快 4 倍。您可能还希望尝试同时运行循环的每次迭代。您可以通过创建一个一次处理一个 x,y 坐标的函数来完成此操作,然后在进程池中运行要检查的每个点:

def process_poi(interest, x, y):
    grid = Grid.from_raster(interest, data_name='map')

    grid.catchment(data='map', x=x, y=y, out_name='catch')

    variable = grid.view('catch', nodata=np.nan)
    variable = np.array(variable)
    return variable.mean()

async def var_loop_async(interest, pool, loop):
    tasks = []
    for (x,y) in zip(xs,ys):
        function_call = functools.partial(process_poi, interest, x, y)
        tasks.append(loop.run_in_executor(pool, function_call))

    return await asyncio.gather(*tasks)

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(var_loop_async(a, pool, loop))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time() 
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我得到Process pool took 3.2950568199157715- 所以并不比我们的第一个版本快,每次调用一个进程var。这可能是因为此时的限制因素是我们的 CPU 上有多少个可用核心,将我们的工作分成更小的增量并不会增加太多价值。

也就是说,如果您希望在两个图像上检查 1000 个 x 和 y 坐标,则最后一种方法可能会带来性能提升。