在 Jupyter Notebook 中并行化绘图循环

mat*_*tos 3 python parallel-processing

我使用的是 Python 版本 3.5.1。我想并行化一个循环,该循环用于使用 imshow 绘制一组数组。没有任何并行化的最小代码如下

import matplotlib.pyplot as plt
import numpy as np

# Generate data

arrays   = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]

# Loop and plot sequentially

for i in range(len(arrays)):

    # Plot side by side

    figure = plt.figure(figsize = (20, 12))
    ax_1 = figure.add_subplot(1, 2, 1)
    ax_2 = figure.add_subplot(1, 2, 2)

    ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
    ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))

    plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
    plt.close()
Run Code Online (Sandbox Code Playgroud)

该代码目前是在 Jupyter 笔记本中编写的,我想仅通过 Jupyter 笔记本进行所有处理。虽然这种方法效果很好,但实际上我有 2500 多个数组,并且每秒大约绘制 1 个图,这需要很长时间才能完成。我想做的是将计算拆分到 N 个处理器上,以便每个处理器为 len(arrays)/N 个数组绘制图。由于绘图是各个阵列本身的图,因此在任何计算过程中核心都不需要相互通信(无共享)。

我发现多处理包对于类似的问题很有用。但是,它不适用于我的问题,因为您无法将二维数组传递到函数中。如果我这样修改上面的代码

# Generate data

arrays   = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]

x = list(zip(arrays, arrays_2))

def plot_file(information):

    arrays, arrays_2 = list(information[0]), list(information[1])
    print(np.shape(arrays[0][0]), np.shape(arrays_2[0][0]))
    
    # Loop and plot sequentially

    for i in range(len(arrays)):        

        # Plot side by side

        figure = plt.figure(figsize = (20, 12))
        ax_1 = figure.add_subplot(1, 2, 1)
        ax_2 = figure.add_subplot(1, 2, 2)

        ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
        ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))

        plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
        plt.close()
    
from multiprocessing import Pool
pool = Pool(4)
pool.map(plot_file, x)
Run Code Online (Sandbox Code Playgroud)

然后我收到错误“TypeError:图像数据的尺寸无效”,并且数组尺寸的打印输出现在只是 (2, ) 而不是 (3, 2)。显然,这是因为多重处理不/不能处理 2D 数组作为输入。

所以我想知道如何在 Jupyter 笔记本中并行化它?有人可以告诉我该怎么做吗?


编辑(2022 年 3 月 11 日):

我的原始代码的真正问题是 pool.map(func, args) 一次传入 args 的一个元素以在单个处理器上运行,而不是像我想象的那样传递整个数组列表,这意味着当我尝试循环时在数组列表上,我循环遍历数组的行,然后尝试对行进行 imshow 绘图,从而产生错误。

不管怎样,虽然这个问题已经有一个很好的答案被接受,但我想我会提供仅使用多处理包工作的代码,以防其他人遇到同样的问题或者如果有人想看看应该如何完成。

n        = 10
arrays_1 = (np.random.rand(256, 256) for x in range(n))
arrays_2 = (np.random.rand(256, 256) for x in range(n))

x = zip(range(n), arrays_1, arrays_2) # need to pass the args into pool.map(func, args) as a tuple

def plot_file(information):
    
    # get cpu name that is working on current data
    process_name = multiprocessing.current_process().name
    print('Process name {} is plotting'.format(process_name))

    # unpack elements of tuple
    index, arrays_1, arrays_2 = information
    
    # plot
    figure = plt.figure(figsize = (20, 12))
    ax_1 = figure.add_subplot(1, 2, 1)
    ax_2 = figure.add_subplot(1, 2, 2)

    ax_1.imshow(arrays_1, interpolation='gaussian', cmap='RdBu')
    ax_2.imshow(arrays_2, interpolation='gaussian', cmap='YlGn')

    # save
    plt.savefig('./{}'.format(index), bbox_inches='tight')
    plt.close()

import multiprocessing
if __name__ == "__main__":
    
    pool = multiprocessing.Pool(multiprocessing.cpu_count()//4) # use one quarter of available processors
    pool.map(plot_file, x)                                      # sequentially map each element of x to the function and process
Run Code Online (Sandbox Code Playgroud)

Mic*_*ado 6

一种简单的方法是使用dask.distributed多处理引擎。我只建议使用外部模块,因为 dask 会为您处理对象的序列化,从而使操作变得非常简单:

import matplotlib
# include this line to allow your processes to plot without a screen
matplotlib.use('Agg')

import matplotlib.pyplot as plt
import dask.distributed
import numpy as np

def plot_file(i, array_1, array_2):
    matplotlib.use('Agg')

    # will be called once for each array "job"
    figure = plt.figure(figsize = (20, 12))
    ax_1 = figure.add_subplot(1, 2, 1)
    ax_2 = figure.add_subplot(1, 2, 2)

    for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
        ax.imshow(
            arr,
            interpolation='gaussian',
            cmap='RdBu',
            vmin=0.5*np.min(arr),
            vmax=0.5*np.max(arr),
        )

    figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
    plt.close(figure)

arrays   = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]

client = dask.distributed.Client() # uses multiprocessing by default
futures = client.map(plot_file, range(len(arrays)), arrays, arrays_2)
dask.distributed.progress(futures)
Run Code Online (Sandbox Code Playgroud)

然而,如果可能的话,更有效的方法是在映射任务中生成或准备数组。这也允许您并行执行数组操作、I/O 等:

def prep_arrays_and_plot(i):
    array_1 = np.random.rand(3,2)
    array_2 = np.random.rand(3,2)
    plot_file(i, array_1, array_2)

futures = client.map(prep_arrays_and_plot, range(10))
dask.distributed.progress(futures)
Run Code Online (Sandbox Code Playgroud)

此时,您不需要腌制任何东西,因此使用多处理进行编写并不是什么大问题。下面的脚本运行得很好:

import matplotlib
matplotlib.use("Agg")

import matplotlib.pyplot as plt
import numpy as np
import multiprocessing

def plot_file(i, array_1, array_2):
    matplotlib.use('Agg')

    # will be called once for each array "job"
    figure = plt.figure(figsize = (20, 12))
    ax_1 = figure.add_subplot(1, 2, 1)
    ax_2 = figure.add_subplot(1, 2, 2)

    for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
        ax.imshow(
            arr,
            interpolation='gaussian',
            cmap='RdBu',
            vmin=0.5*np.min(arr),
            vmax=0.5*np.max(arr),
        )

    figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
    plt.close(figure)

def prep_arrays_and_plot(i):
    array_1 = np.random.rand(3,2)
    array_2 = np.random.rand(3,2)
    plot_file(i, array_1, array_2)

def main():
    pool = multiprocessing.Pool(4)
    pool.map(prep_arrays_and_plot, range(10))

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

请注意,如果您从 jupyter 笔记本运行此程序,则不能简单地在单元格中定义函数并将它们传递给 multiprocessing.Pool。相反,您必须在不同的文件中定义它们并导入它们。这不适用于dask(事实上,如果用dask定义notebook中的函数会更容易)。