use*_*959 5 python arrays parallel-processing numpy
我正在尝试使用 Multiprocessing 和 Pool.map() 命令并行化我一直在研究的算法。我遇到了一个问题,希望有人能指出我正确的方向。
让 x 表示一个 N 行 1 列的数组,它被初始化为一个零向量。让 C 表示一个长度为 N 乘以 2 的数组。向量 x 是通过使用来自 C 的某些子集的信息(进行一些数学运算)迭代构建的。作为大型 for 循环的代码(未并行化)大致如下所示:
for j in range(0,N)
#indx_j will have n_j <<N entries
indx_j = build_indices(C,j)
#x_j will be entries to be added to vector x at indices indx_j
#This part is time consuming
x_j = build_x_j(indx_j,C)
#Add x_j into entries of x
x[indx_j] = x[indx_j] + x_j
Run Code Online (Sandbox Code Playgroud)
我能够使用 multiprocessing 模块并行化它并使用 pool.map 来消除大型 for 循环。除了将 x_j 添加到 x[indx_j] 的步骤之外,我编写了一个执行上述计算的函数。相反,并行化函数返回两个数据集:x_j 和 indx_j。在计算完这些之后,我运行一个 for 循环(非并行),通过对 j=0,N 进行 x[indx_j] = x[indx_j] +x_j 计算来构建 x。
我的方法的缺点是 pool.map 操作返回 N 对数组 x_j 和 indx_j 的巨大列表。其中 x_j 和 indx_j 都是 n_j 乘以 1 个向量(n_j << N)。对于大 N(N > 20,000),这占用了太多内存。这是我的问题:我能否以某种方式并行地执行构造操作 x[indx_j] = x[indx_j] + x_j。在我看来,pool.map() 中的每个进程都必须能够与向量 x 进行交互。我是否将 x 放在某种共享内存中?我怎么会做这样的事情?我怀疑这必须以某种方式成为可能,因为我假设人们一直在为有限元方法并行组装矩阵。如何让多个进程与向量交互而不会出现某种问题?我担心也许对于 j= 20 和 j = 23,如果它们同时发生,他们可能会尝试添加 x[indx_20] = x[indx_20] + x_20 并同时添加 x[indx_30] = x[indx_30] + x_30 并且可能会发生一些错误。我也不知道如何通过 pool.map() 完成这个计算(我不认为我可以将 x 作为输入输入,因为它会在每个过程之后发生变化)。
我不确定这是否重要,但集合 indx_j 将具有非平凡的交集(例如,indx_1 和 indx_2 可能具有索引 [1,2,3] 和 [3,4,5])。
如果这不清楚,请告诉我,我会尽力澄清。这是我第一次尝试并行工作,所以我非常不确定如何进行。任何信息将不胜感激。谢谢!
我不知道我是否有资格就共享内存数组的主题提供适当的建议,但我最近也有类似的需要在 python 中跨进程共享数组,并且在numpy.ndarraynumpy 中使用共享内存数组遇到了一个小型自定义实现multiprocessing.内的 ctypes 这是代码的链接:shmarray.py。它的行为就像普通数组一样,只不过底层数据存储在共享内存中,这意味着单独的进程可以读取和写入同一个数组。
使用共享内存阵列
在线程中,线程可用的所有信息(全局和本地命名空间)都可以在有权访问它的所有其他线程之间共享处理,但在多处理中,数据并不那么容易访问。在Linux上数据可以读取,但不能写入。相反,当写入完成时,数据会被复制然后写入,这意味着其他进程无法看到这些更改。但是,如果要写入的内存是共享内存,则不会复制它。这意味着使用 shmarray,我们可以执行与线程处理类似的操作,并具有真正的多处理并行性。访问共享内存数组的一种方法是使用子类。我知道您当前正在使用 Pool.map(),但我感觉 map 的工作方式受到限制,尤其是在处理 n 维数组时。Pool.map() 并不是真正设计用于与 numpy 风格的接口一起使用,至少我认为它不容易。这是一个简单的想法,您可以在其中为每个jin生成一个进程N:
import numpy as np
import shmarray
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, j, C, x):
multiprocessing.Process.__init__()
self.shared_x = x
self.C = C
self.j = j
def run(self):
#Your Stuff
#indx_j will have n_j <<N entries
indx_j = build_indices(self.C,self.j)
#x_j will be entries to be added to vector x at indices indx_j
x_j = build_x_j(indx_j,self.C)
#Add x_j into entries of x
self.shared_x[indx_j] = self.shared_x[indx_j] + x_j
#And then actually do the work
N = #What ever N should be
x = shmarray.zeros(shape=(N,1))
C = #What ever C is, doesn't need to be shared mem, since no writing is happening
procs = []
for j in range(N):
proc = Worker(j, C, x)
procs.append(proc)
proc.start()
#And then join() the processes with the main process
for proc in procs:
proc.join()
Run Code Online (Sandbox Code Playgroud)
自定义进程池和队列
因此,这可能有效,但如果您只有几个核心,那么生成数千个进程实际上并没有任何用处。我处理这个问题的方法是Queue在我的流程之间实现一个系统。也就是说,我们有Queue一个主进程用 填充j,然后几个工作进程从 中获取数字Queue并使用它,请注意,通过实现这个,您实际上正在做的正是所做的事情Pool。另请注意,我们实际上将使用multiprocessing.JoinableQueue此方法,因为它允许使用 join() 等待队列清空。
实现这个其实并不难,只是我们必须稍微修改我们的子类以及我们如何使用它。导入 numpy 作为 np 导入 shmarray 导入多处理
class Worker(multiprocessing.Process):
def __init__(self, C, x, job_queue):
multiprocessing.Process.__init__()
self.shared_x = x
self.C = C
self.job_queue = job_queue
def run(self):
#New Queue Stuff
j = None
while j!='kill': #this is how I kill processes with queues, there might be a cleaner way.
j = self.job_queue.get() #gets a job from the queue if there is one, otherwise blocks.
if j!='kill':
#Your Stuff
indx_j = build_indices(self.C,j)
x_j = build_x_j(indx_j,self.C)
self.shared_x[indx_j] = self.shared_x[indx_j] + x_j
#This tells the queue that the job that was pulled from it
#Has been completed (we need this for queue.join())
self.job_queue.task_done()
#The way we interact has changed, now we need to define a job queue
job_queue = multiprocessing.JoinableQueue()
N = #What ever N should be
x = shmarray.zeros(shape=(N,1))
C = #What ever C is, doesn't need to be shared mem, since no writing is happening
procs = []
proc_count = multiprocessing.cpu_count() # create as many procs as cores
for _ in range(proc_count):
proc = Worker(C, x, job_queue) #now we pass the job queue instead
procs.append(proc)
proc.start()
#all the workers are just waiting for jobs now.
for j in range(N):
job_queue.put(j)
job_queue.join() #this blocks the main process until the queue has been emptied
#Now if you want to kill all the processes, just send a 'kill'
#job for each process.
for proc in procs:
job_queue.put('kill')
job_queue.join()
Run Code Online (Sandbox Code Playgroud)
最后,我真的不能说这将如何同时处理重叠索引的写入。最糟糕的情况是,如果两件事尝试同时写入并且事情被损坏/崩溃,您可能会遇到严重的问题(我不是这里的专家,所以我真的不知道这种情况是否会发生)。最好的情况是,因为您只是在做加法,并且操作顺序并不重要,一切都会顺利进行。如果运行不顺利,我的建议是创建第二个自定义Process子类,专门执行数组分配。要实现这一点,您需要将作业队列和“输出”队列传递给 Worker 子类。在 while 循环中,您应该有一个 `output_queue.put((indx_j, x_j))。 注意:如果您将它们放入队列中,它们将被腌制,这可能会很慢。如果可以的话,我建议在使用 put 之前将它们设置为共享内存数组。在某些情况下,腌制它们可能会更快,但我还没有对此进行测试。 要在生成时分配这些值,您需要让分配器进程从队列中读取这些值作为作业并应用它们,这样工作循环本质上是:
def run(self):
job = None
while job!='kill':
job = self.job_queue.get()
if job!='kill':
indx_j, x_j = job
#Note this is the process which really needs access to the X array.
self.x[indx_j] += x_j
self.job_queue.task_done()
Run Code Online (Sandbox Code Playgroud)
最后一个解决方案可能会比在工作线程中进行分配慢,但如果您这样做,则不必担心竞争条件,并且内存仍然更轻,因为您可以在生成 和indx_j值x_j时用完它们,而不是等到所有这些都完成。
Windows 注意事项
我没有在 Windows 上做任何这些工作,所以我不能 100% 确定,但我相信上面的代码将非常占用内存,因为 Windows 没有实现用于生成独立进程的写时复制系统。本质上,Windows 在从主进程生成新进程时会复制进程所需的所有信息。为了解决这个问题,我认为用共享内存数组(您将传递给其他进程的任何东西)代替普通数组替换您的所有x_j和应该会导致 Windows不复制数据,但我不确定。你没有指定你所在的平台,所以我认为安全总比遗憾好,因为多处理在 Windows 上与 Linux 上是不同的。C
| 归档时间: |
|
| 查看次数: |
3578 次 |
| 最近记录: |