共享 numpy 数组时的 python 多处理

Sen*_*Yan 2 python arrays numpy python-multiprocessing

我想通过利用多重处理来部分更改大型 numpy 数组中的值。

也就是说我最后想要得到[[100, 100, 100], [100, 100, 100]]。

但是,以下代码是错误的,它显示“RuntimeError:SynchronizedArray 对象只能通过继承在进程之间共享”

我应该怎么办?谢谢。

import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def change_array(array, i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = Array('d', X_shape[0] * X_shape[1])
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)

    pool = multiprocessing.Pool(processes=3)

    result = []
    for i in range(2):
        for j in range(3):
            result.append(pool.apply_async(change_array, (X, i, j,)))

    result = [r.get() for r in result]
    pool.close()
    pool.join()

    print(np.frombuffer(X.get_obj()).reshape(2, 3))

Run Code Online (Sandbox Code Playgroud)

Boo*_*boo 7

您需要进行两项更改:

  1. 使用multiprocessing.Array带有锁定的实例(实际上是默认的)而不是“plain” Array
  2. 不要将数组实例作为参数传递给工作函数。相反,您应该使用数组作为全局值来初始化池中的每个处理器。
import numpy as np
import multiprocessing

from multiprocessing import RawArray, Array


def initpool(arr):
    global array
    array = arr

def change_array(i, j):
    X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
    X_np[i, j] = 100
    print(np.frombuffer(array.get_obj()))

if __name__ == '__main__':
    X_shape = (2, 3)
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)

    pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))

    result = []
    for i in range(2):
        for j in range(3):
            result.append(pool.apply_async(change_array, (i, j,)))

    result = [r.get() for r in result]
    pool.close()
    pool.join()

    print(np.frombuffer(X.get_obj()).reshape(2, 3))
Run Code Online (Sandbox Code Playgroud)

印刷:

[100.    2.2   3.3   4.4   5.5   6.6]
[100.  100.    3.3   4.4   5.5   6.6]
[100.  100.  100.    4.4   5.5   6.6]
[100.  100.  100.  100.    5.5   6.6]
[100.  100.  100.  100.  100.    6.6]
[100. 100. 100. 100. 100. 100.]
[[100. 100. 100.]
 [100. 100. 100.]]
Run Code Online (Sandbox Code Playgroud)

更新

由于在这种情况下,数组中更改的值data不依赖于该数组中的现有值,因此函数不需要change_array访问该数组,而是可以按照 Frank Yellin 的建议,只返回一个元组索引将随新值而更改。但我确实想向您展示如何在函数确实需要访问/修改数组的情况下传递数组。然而,在本例中,以下代码就是您所需要的(我做了一些简化):

import numpy as np
import multiprocessing


def change_array(i, j):
    return i, j, 100

if __name__ == '__main__':
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    with multiprocessing.Pool(processes=3) as pool:
        result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
        for r in result:
            i, j, value = r.get()
            data[i, j] = value
        print(data)
Run Code Online (Sandbox Code Playgroud)

或者:

import numpy as np
import multiprocessing
import itertools


def change_array(t):
    i, j = t
    return i, j, 100

if __name__ == '__main__':
    data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
    with multiprocessing.Pool(processes=3) as pool:
        for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
            data[i, j] = value
        print(data)
Run Code Online (Sandbox Code Playgroud)