为什么在使用pythons多处理在for循环中使用共享numpy数据进行令人难以置信的并行问题时,没有加速?

Eng*_*her 15 python multiprocessing

我想加快一个与贝叶斯推理相关的令人尴尬的并行问题.目的是在给定矩阵A的情况下推断出一组图像x的系数u,使得X = A*U. X的尺寸为mxn,A mxp和U pxn.对于X的每一列,必须推断系数U的最佳对应列.最后,该信息用于更新A.我使用m = 3000,p = 1500和n = 100.因此,因为它是在线性模型中,系数矩阵u的推断由n个独立的计算组成.因此,我尝试使用Python的多处理模块,但没有加速.

这是我做的:

没有并行化的主要结构是:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))
Run Code Online (Sandbox Code Playgroud)

多处理的实现:

我试图实现多处理.我有一台可以使用的8核机器.

  1. 有3个for循环.唯一似乎是"可并行化"的是第三个,其中系数是推断的:
    • 产生队列和从0堆栈迭代号码的batch_size到-1到队列
    • 生成8个进程,让它们通过Queue工作
  2. 使用multiprocessing.Array共享数据U.

所以,我用以下内容替换了第三个循环:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()
Run Code Online (Sandbox Code Playgroud)

这是Wrap_mp类:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()
Run Code Online (Sandbox Code Playgroud)

这里是函数infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break
Run Code Online (Sandbox Code Playgroud)

现在的问题如下:

  1. 对于给定的数据维度,多处理版本并不比单线程版本快.
  2. 每次迭代都会增加进程ID.这是否意味着不断产生新的流程?这不会产生巨大的开销吗?我怎么能避免这种情况?有可能在整个for循环中创建8个不同的进程,只是用数据更新它们吗?
  3. 我在进程中共享系数U的方式是否会减慢计算速度?还有另一个更好的方法吗?
  4. 流程池会更好吗?

我非常感谢任何帮助!我一个月前就开始使用Python了,现在我很迷茫.

恩金

Ted*_*rek 5

每次创建Process时,您都在创建一个新流程.如果您在for循环中执行此操作,那么是的,您每次循环都会启动新进程.听起来你想要做的是在循环外初始化你的队列和进程,然后在循环内填充队列.

我之前使用过multiprocessing.Pool,它很有用,但它并没有提供你已经使用Queue实现的内容.