Gun*_*Gun 4 python parallel-processing multithreading multiprocessing python-asyncio
1.我有一个函数var。我想知道通过利用系统拥有的所有处理器、内核、线程和 RAM 内存进行多处理/并行处理来快速运行此函数中的循环的最佳方法。
import numpy
from pysheds.grid import Grid
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
def var(interest):
variable_avg = []
for (x,y) in zip(xs,ys):
grid = Grid.from_raster(interest, data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable = numpy.array(variable)
variablemean = (variable).mean()
variable_avg.append(variablemean)
return(variable_avg)
Run Code Online (Sandbox Code Playgroud)
2.var如果我可以针对给定的函数多个参数并行运行函数和循环,那就太好了。var(a)例如:var(b)同时。因为它比单独并行化循环消耗的时间要少得多。
如果没有意义,请忽略 2。
TLDR:
您可以使用多处理库并行运行您的var函数。但是,正如所写,由于其开销,您可能没有var对多处理进行足够的调用来获得性能优势。如果您所需要做的就是运行这两个调用,那么串行运行可能是您获得的最快速度。但是,如果您需要进行大量调用,多重处理可以帮助您。
我们需要使用进程池来并行运行它,线程在这里不起作用,因为 Python 的全局解释器锁将阻止我们实现真正的并行性。进程池的缺点是进程的启动速度很慢。在仅运行两次调用的示例中,var创建池的时间超过了运行var本身所花费的时间。
为了说明这一点,让我们使用进程池并使用 asyncio 并行运行调用var,并将其与仅顺序运行事物进行比较。请注意,为了运行此示例,我使用了 Pysheds 库https://github.com/mdbartos/pysheds/tree/master/data中的图像- 如果您的图像大得多,则以下内容可能不成立。
import functools
import time
from concurrent.futures.process import ProcessPoolExecutor
import asyncio
a = 'diem.tif'
xs = 10, 20, 30, 40, 50
ys = 10, 20, 30, 40, 50
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
with ProcessPoolExecutor() as pool:
task_one = loop.run_in_executor(pool, functools.partial(var, a))
task_two = loop.run_in_executor(pool, functools.partial(var, a))
results = await asyncio.gather(task_one, task_two)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
result_one = var(a)
result_two = var(a)
serial_end = time.time()
print(f'Running in serial took {serial_end - serial_start}')
if __name__ == "__main__":
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
在我的机器(2.4 GHz 8 核 Intel Core i9)上运行上述命令,我得到以下输出:
Process pool took 1.7581260204315186
Running in serial took 0.32335805892944336
Run Code Online (Sandbox Code Playgroud)
在此示例中,进程池的速度慢了五倍多!这是由于创建和管理多个进程的开销造成的。也就是说,如果您需要调用var多次,那么进程池可能更有意义。让我们对其进行调整以运行var100 次并比较结果:
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(loop.run_in_executor(pool, functools.partial(var, a)))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
for _ in range(100):
result = var(a)
serial_end = time.time()
print(f'Running in serial took {serial_end - serial_start}')
Run Code Online (Sandbox Code Playgroud)
运行 100 次,我得到以下输出:
Process pool took 3.442288875579834
Running in serial took 13.769982099533081
Run Code Online (Sandbox Code Playgroud)
在这种情况下,在进程池中运行大约快 4 倍。您可能还希望尝试同时运行循环的每次迭代。您可以通过创建一个一次处理一个 x,y 坐标的函数来完成此操作,然后在进程池中运行要检查的每个点:
def process_poi(interest, x, y):
grid = Grid.from_raster(interest, data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable = np.array(variable)
return variable.mean()
async def var_loop_async(interest, pool, loop):
tasks = []
for (x,y) in zip(xs,ys):
function_call = functools.partial(process_poi, interest, x, y)
tasks.append(loop.run_in_executor(pool, function_call))
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(var_loop_async(a, pool, loop))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
Run Code Online (Sandbox Code Playgroud)
在这种情况下,我得到Process pool took 3.2950568199157715- 所以并不比我们的第一个版本快,每次调用一个进程var。这可能是因为此时的限制因素是我们的 CPU 上有多少个可用核心,将我们的工作分成更小的增量并不会增加太多价值。
也就是说,如果您希望在两个图像上检查 1000 个 x 和 y 坐标,则最后一种方法可能会带来性能提升。