mat*_*tos 3 python parallel-processing
我使用的是 Python 版本 3.5.1。我想并行化一个循环,该循环用于使用 imshow 绘制一组数组。没有任何并行化的最小代码如下
import matplotlib.pyplot as plt
import numpy as np
# Generate data
arrays = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
# Loop and plot sequentially
for i in range(len(arrays)):
# Plot side by side
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))
plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close()
Run Code Online (Sandbox Code Playgroud)
该代码目前是在 Jupyter 笔记本中编写的,我想仅通过 Jupyter 笔记本进行所有处理。虽然这种方法效果很好,但实际上我有 2500 多个数组,并且每秒大约绘制 1 个图,这需要很长时间才能完成。我想做的是将计算拆分到 N 个处理器上,以便每个处理器为 len(arrays)/N 个数组绘制图。由于绘图是各个阵列本身的图,因此在任何计算过程中核心都不需要相互通信(无共享)。
我发现多处理包对于类似的问题很有用。但是,它不适用于我的问题,因为您无法将二维数组传递到函数中。如果我这样修改上面的代码
# Generate data
arrays = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
x = list(zip(arrays, arrays_2))
def plot_file(information):
arrays, arrays_2 = list(information[0]), list(information[1])
print(np.shape(arrays[0][0]), np.shape(arrays_2[0][0]))
# Loop and plot sequentially
for i in range(len(arrays)):
# Plot side by side
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))
plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close()
from multiprocessing import Pool
pool = Pool(4)
pool.map(plot_file, x)
Run Code Online (Sandbox Code Playgroud)
然后我收到错误“TypeError:图像数据的尺寸无效”,并且数组尺寸的打印输出现在只是 (2, ) 而不是 (3, 2)。显然,这是因为多重处理不/不能处理 2D 数组作为输入。
所以我想知道如何在 Jupyter 笔记本中并行化它?有人可以告诉我该怎么做吗?
编辑(2022 年 3 月 11 日):
我的原始代码的真正问题是 pool.map(func, args) 一次传入 args 的一个元素以在单个处理器上运行,而不是像我想象的那样传递整个数组列表,这意味着当我尝试循环时在数组列表上,我循环遍历数组的行,然后尝试对行进行 imshow 绘图,从而产生错误。
不管怎样,虽然这个问题已经有一个很好的答案被接受,但我想我会提供仅使用多处理包工作的代码,以防其他人遇到同样的问题或者如果有人想看看应该如何完成。
n = 10
arrays_1 = (np.random.rand(256, 256) for x in range(n))
arrays_2 = (np.random.rand(256, 256) for x in range(n))
x = zip(range(n), arrays_1, arrays_2) # need to pass the args into pool.map(func, args) as a tuple
def plot_file(information):
# get cpu name that is working on current data
process_name = multiprocessing.current_process().name
print('Process name {} is plotting'.format(process_name))
# unpack elements of tuple
index, arrays_1, arrays_2 = information
# plot
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays_1, interpolation='gaussian', cmap='RdBu')
ax_2.imshow(arrays_2, interpolation='gaussian', cmap='YlGn')
# save
plt.savefig('./{}'.format(index), bbox_inches='tight')
plt.close()
import multiprocessing
if __name__ == "__main__":
pool = multiprocessing.Pool(multiprocessing.cpu_count()//4) # use one quarter of available processors
pool.map(plot_file, x) # sequentially map each element of x to the function and process
Run Code Online (Sandbox Code Playgroud)
一种简单的方法是使用dask.distributed
多处理引擎。我只建议使用外部模块,因为 dask 会为您处理对象的序列化,从而使操作变得非常简单:
import matplotlib
# include this line to allow your processes to plot without a screen
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import dask.distributed
import numpy as np
def plot_file(i, array_1, array_2):
matplotlib.use('Agg')
# will be called once for each array "job"
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
ax.imshow(
arr,
interpolation='gaussian',
cmap='RdBu',
vmin=0.5*np.min(arr),
vmax=0.5*np.max(arr),
)
figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close(figure)
arrays = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
client = dask.distributed.Client() # uses multiprocessing by default
futures = client.map(plot_file, range(len(arrays)), arrays, arrays_2)
dask.distributed.progress(futures)
Run Code Online (Sandbox Code Playgroud)
然而,如果可能的话,更有效的方法是在映射任务中生成或准备数组。这也允许您并行执行数组操作、I/O 等:
def prep_arrays_and_plot(i):
array_1 = np.random.rand(3,2)
array_2 = np.random.rand(3,2)
plot_file(i, array_1, array_2)
futures = client.map(prep_arrays_and_plot, range(10))
dask.distributed.progress(futures)
Run Code Online (Sandbox Code Playgroud)
此时,您不需要腌制任何东西,因此使用多处理进行编写并不是什么大问题。下面的脚本运行得很好:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import multiprocessing
def plot_file(i, array_1, array_2):
matplotlib.use('Agg')
# will be called once for each array "job"
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
ax.imshow(
arr,
interpolation='gaussian',
cmap='RdBu',
vmin=0.5*np.min(arr),
vmax=0.5*np.max(arr),
)
figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close(figure)
def prep_arrays_and_plot(i):
array_1 = np.random.rand(3,2)
array_2 = np.random.rand(3,2)
plot_file(i, array_1, array_2)
def main():
pool = multiprocessing.Pool(4)
pool.map(prep_arrays_and_plot, range(10))
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
请注意,如果您从 jupyter 笔记本运行此程序,则不能简单地在单元格中定义函数并将它们传递给 multiprocessing.Pool。相反,您必须在不同的文件中定义它们并导入它们。这不适用于dask(事实上,如果用dask定义notebook中的函数会更容易)。
归档时间: |
|
查看次数: |
881 次 |
最近记录: |