使用多处理的并行处理比顺序处理慢

Jan*_*wda 2 python linux parallel-processing multiprocessing

我有一个代码,我需要并行化.代码本身没有问题.代码是python类的方法.例如,

class test:
     def __init__(self):
         <...>
     def method(self):
         <...>
Run Code Online (Sandbox Code Playgroud)

我这样写的是因为完全代码的细节可能不相关而且很长.一开始我尝试并行化这个代码(只有两个实例):

t1=test()
t2=test()
pr1=Process(target=t1.method, args=(,))
pr2=Process(target=t2.method, args=(,))
pr1.start()
pr2.start()
pr1.join()
pr2.join()
Run Code Online (Sandbox Code Playgroud)

但这没效果.它不仅运行速度比运行一个实例慢得多,然后是另一个实例,而且还存在类变量未被修改的问题.由于@MattDMo在这个线程中的答案,通过创建共享命名空间,共享变量和共享列表,最后一个问题得到了解决:

import multiprocessing as mp
<...>
self.manager=mp.Manager()
self.shared=self.manager.Namespace()
self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])
self.shared.V=V
Run Code Online (Sandbox Code Playgroud)

但它仍然运行速度非常慢.

一开始我认为,因为我在具有两个内核的笔记本电脑中执行代码,两个内核已经饱和,但是两个实例和计算机变慢,因为无法快速执行任何其他任务.所以我决定在具有6个内核的桌面PC(也是一个linux系统)中尝试代码.它没有解决问题.仍然是并行化版本要慢得多.另一方面,当我使用多线程执行C编译代码时,台式计算机的CPU不会变得非常热.有人知道发生了什么事吗?

完整的代码在这里,和下方包括:

from math import exp
from pylab import *
from scipy.stats import norm
from scipy.integrate import ode
from random import gauss,random
from numpy import dot,fft
from time import time

import multiprocessing as mp
from multiprocessing import Pool
from multiprocessing import Process
from multiprocessing import Queue, Pipe
from multiprocessing import Lock, current_process


#Global variables

sec_steps=1000 #resolution (steps per second)
DT=1/float(sec_steps)
stdd=20 #standard deviation for retina random input
stdd2=20 #standard deviation for sigmoid

#FUNCTION TO APPROXIMATE NORMAL CUMULATIVE DISTRIBUTION FUNCTION

def sigmoid(x,mu,sigma):
    beta1=-0.0004406
    beta2=0.0418198
    beta3=0.9
    z=(x-mu)/sigma
    if z>8:
        return 1
    elif z<-8:
        return 0
    else:
        return 1/(1+exp(-sqrt(pi)*(beta1*z**5+beta2*z**3+beta3*z)))

#CLASSES

class retina: ##GAUSSIAN WHITE NOISE GENERATOR
    def __init__(self,mu,sigma):
        self.mu=mu
        self.sigma=sigma
    def create_pulse(self):
        def pulse():
            return gauss(self.mu,self.sigma)
            #return uniform(-1,1)*sqrt(3.)*self.sigma+self.mu
        return pulse
        def test_white_noise(self,N): #test frequency spectrum of random number generator for N seconds
                noise=[]
                pulse=self.create_pulse()
        steps=sec_steps*N+1
        t=linspace(0,N,steps)
                for i in t:
                        noise.append(pulse())
                X=fft(noise)
                X=[abs(x)/(steps/2.0) for x in X]
        xlim([0,steps/N])
        xlabel("freq. (Hz)")
        ylabel("Ampl. (V)")
                plot((t*steps/N**2)[1:],X[1:],color='black')
        #savefig('./wnoise.eps', format='eps', dpi=1000)
                show()
        return noise


class cleft: #object: parent class for a synaptic cleft
    def __init__(self):
        self.shared=manager.Namespace()
        self.shared.preV=0.0 #pre-synaptic voltage
        self.shared.r=0.0 #proportion of channels opened
    Tmax=1.0  #mM
    mu=-35.0  #mV
    sigma=stdd2 #mV

    def T(self): #Receives presynaptic Voltage preV, returns concentration of the corresponding neurotransmitter.
        return self.Tmax*sigmoid(self.shared.preV,self.mu,self.sigma)
    def r_next(self): #Solves kinematic ode  -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
        """ 
        runs the ode for one unit of time dt, as specified
        updates the previous r taken as initial condition
        """
        tau=1.0/(self.alfa*self.T()+self.beta)
        r_inf=self.alfa*self.T()*tau
        self.shared.r=r_inf+(self.shared.r-r_inf)*exp(-DT/tau)
    def DI(self,postV): #Receives PSP and computes resulting change in PSC
        return self.g*self.shared.r*(postV-self.restV)

class ampa_cleft(cleft): #Child class for ampa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5 #initial condition for r
        self.alfa=2.0
        self.beta=0.1
        self.restV=0.0
        self.g=0.1


class gaba_a_cleft(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.shared=manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_trnTOtrn(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_inTOin(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-75.0
        self.g=0.2

class gaba_a_cleft_trnTOtcr(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-85.0
        self.g=0.1

class gaba_a_cleft_inTOtcr(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.alfa=2.0
        self.beta=0.08
        self.restV=-85.0
        self.g=0.1

class gaba_b_cleft(cleft): #Child class for GABAa synaptic connection
    def __init__(self):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.shared.preV=0.0
        self.shared.r=0.5
        self.shared.R=0.5
        self.shared.X=0.5
        self.alfa_1=0.02
        self.alfa_2=0.03
        self.beta_1=0.05
        self.beta_2=0.01
        self.restV=-100.0
        self.g=0.06

        self.n=4
        self.Kd=100 #Dissociation constant
    def r_next(self): #Solves kinematic ode  SECOND MESSENGER -analytical solution- to find r after one time step DT (needs T and alfa and beta parameters)
        """ 
        runs the ode for one unit of time dt, as specified
        updates the previous r taken as initial condition
        """
        Q1=self.alfa_1*self.T()
        Q2=-Q1-self.beta_1
        R0=self.shared.R
        X0=self.shared.X
        self.shared.R=(Q1*(exp(Q2*DT)-1)+Q2*R0*exp(Q2*DT))/Q2
        self.shared.X=(exp(-self.beta_2*DT)*(self.alfa_2*(self.beta_2*(exp(DT*(self.beta_2+Q2))*(Q1+Q2*R0)+Q1*(-exp(self.beta_2*DT))-Q2*R0)-Q1*Q2*(exp(self.beta_2*DT)-1))+self.beta_2*Q2*X0*(self.beta_2+Q2)))/(self.beta_2*Q2*(self.beta_2+Q2))
        self.shared.r=self.shared.X**self.n/(self.shared.X**self.n+self.Kd)

#######################################################################################################################################################

class neuronEnsemble:
    def __init__(self,V):  #Parent class for a Neuron ensemble
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
    kappa=1.0 #conductance
    def V_next(self):  #ode analitycally for a single time step DT 
        K1=self.C[0]*self.g/self.kappa
        K2=(-dot(self.C,self.I)+self.C[0]*self.g*self.restV)/self.kappa
        self.shared.V=K2/K1+(self.shared.V-K2/K1)*exp(-K1*DT)

class TCR_neuronEnsemble(neuronEnsemble):
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-55.0 #rest of leak
        self.C=(1.0,7.1,1.0/2.0*30.9/4.0,1.0/2.0*3.0*30.9/4.0,1.0/2.0*30.9)     #Cleak,C2,C3,C4,C7!!  #connectivity constants to the ensemble
                        #First one is Cleak, the others in same order as in diagram

class TRN_neuronEnsemble(neuronEnsemble):
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-72.5 #rest of leak
        self.C=(1.0,15.0,35.0,0.0,0.0)  #Cleak,C5,C8  #connectivity constants to the ensemble
                        #First one is Cleak, the others in same order as in diagram

class IN_neuronEnsemble(neuronEnsemble): #!!! update all parameters !!!
    def __init__(self,V):
        self.manager=mp.Manager()
        self.shared=self.manager.Namespace()
        self.I=self.manager.list([0.0,0.0,0.0,0.0,0.0])  #Variables to store changes in PSC produced by synaptic connection 
        self.shared.V=V    #Actual state of the membrane potential
        self.g=0.01 #conductance of leak
        self.restV=-70.0 #rest of leak
        self.C=(1.0,47.4,23.6,0.0,0.0)  #Cleak,C1,C6!!  #connectivity constants to the ensemble
                                #First one is Cleak, the others in same order as in diagram
######################################INSTANCE GROUP#################################################################
class group:
    def __init__(self,tcr_V0,trn_V0,in_V0):
        #Declarations of instances
        ####################

        #SYNAPTIC CLEFTS
        self.cleft_ret_in=ampa_cleft() #cleft between retina and IN ensemble
        self.cleft_ret_tcr=ampa_cleft() #cleft between retina and TCR ensemble
        self.cleft_in_in=gaba_a_cleft_inTOin() #cleft between IN and IN ensembles
        self.cleft_in_tcr=gaba_a_cleft_inTOtcr() #cleft between IN and TCR ensembles
        self.cleft_tcr_trn=ampa_cleft() #cleft between TCR and TRN ensembles
        self.cleft_trn_trn=gaba_a_cleft_trnTOtrn() #cleft between TRN and TRN ensembles
        self.cleft_trn_tcr_a=gaba_a_cleft_trnTOtcr() #cleft between TRN and TCR ensembles GABAa
        self.cleft_trn_tcr_b=gaba_b_cleft() #cleft between TRN and TCR ensembles GABAb
        #POPULATIONS    
        self.in_V0=in_V0 #mV i.c excitatory potential
        self.IN=IN_neuronEnsemble(self.in_V0) #create instance of IN ensemble

        self.tcr_V0=tcr_V0 #mV i.c excitatory potential
        self.TCR=TCR_neuronEnsemble(self.tcr_V0) #create instance of TCR ensemble

        self.trn_V0=trn_V0 #mV i.c inhibitory potential
        self.TRN=TRN_neuronEnsemble(self.trn_V0) #create instance of TCR ensemble
    def step(self,p): #makes a step of the circuit for the given instance
        #UPDATE TRN
        self.cleft_tcr_trn.shared.preV=self.TCR.shared.V #cleft takes presynaptic V
        self.cleft_tcr_trn.r_next()   #cleft updates r
        self.TRN.I[2]=self.cleft_tcr_trn.DI(self.TRN.shared.V) #update PSC TCR--->TRN 

        self.cleft_trn_trn.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_trn.r_next()   #cleft updates r
        self.TRN.I[1]=self.cleft_trn_trn.DI(self.TRN.shared.V) #update PSC TRN--->TRN

        self.TRN.V_next()  #update PSP in TRN

        #record retinal pulse ------|> IN AND TCR
        self.cleft_ret_in.shared.preV=self.cleft_ret_tcr.shared.preV=p

        #UPDATE TCR
        self.cleft_ret_tcr.r_next()      #cleft updates r
        self.TCR.I[1]=self.cleft_ret_tcr.DI(self.TCR.shared.V) #update PSC RET---|> TCR

        self.cleft_trn_tcr_b.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_tcr_b.r_next()   #cleft updates r
        self.TCR.I[2]=self.cleft_trn_tcr_b.DI(self.TCR.shared.V)  #update PSC

        self.cleft_trn_tcr_a.shared.preV=self.TRN.shared.V #cleft takes presynaptic V
        self.cleft_trn_tcr_a.r_next()   #cleft updates r
        self.TCR.I[3]=self.cleft_trn_tcr_a.DI(self.TCR.shared.V) #cleft updates r

        self.cleft_in_tcr.shared.preV=self.IN.shared.V #cleft takes presynaptic V
        self.cleft_in_tcr.r_next()   #cleft updates r
        self.TCR.I[4]=self.cleft_in_tcr.DI(self.TCR.shared.V) #update PSC

        self.TCR.V_next()

        #UPDATE IN

        self.cleft_ret_in.r_next()       #cleft updates r
        self.IN.I[1]=self.cleft_ret_in.DI(self.IN.shared.V) #update PSC

        self.cleft_in_in.shared.preV=self.IN.shared.V #cleft takes presynaptic V
        self.cleft_in_in.r_next()   #cleft updates r
        self.IN.I[2]=self.cleft_in_in.DI(self.IN.shared.V)  #update PSC

        self.IN.V_next()
                #----------------------------------------
    def stepN(self, p, N, data_Vtcr, data_Vtrn, data_Vin): #makes N steps, receives a vector of N retinal impulses and output lists
        data_Vtcr.append(self.tcr_V0)
        data_Vtrn.append(self.trn_V0)
        data_Vin.append(self.in_V0)
        for i in xrange(N):
            self.step(p[i])
            data_Vtcr.append(self.TCR.shared.V)     #write to output list
            data_Vtrn.append(self.TRN.shared.V)
            data_Vin.append(self.IN.shared.V)
            name=current_process().name
            print name+" "+str(i)

######################################################################################################################
############################### CODE THAT RUNS THE SIMULATION OF THE MODEL ###########################################
######################################################################################################################

def run(exec_t): 
    """
    runs the simulation for t=exec_t seconds
    """
    t_0=time()
    mu=-45.0 #mV
    sigma=stdd  #20.0 #mV
    ret=retina(mu,sigma) #create instance of white noise generator
    #initial conditions
    tcr_V0=-61.0 #mV i.c excitatory potential
    trn_V0=-84.0 #mV i.c inhibitory potential
    in_V0=-70.0 #mV i.c excitatory potential
    ###########################LISTS FOR STORING DATA POINTS################################
    t=linspace(0.0,exec_t,exec_t*sec_steps+1)
#   data_Vtcr=[]
#   data_Vtcr.append(tcr_V0)
#
#   data_Vtrn=[]
#   data_Vtrn.append(trn_V0)
#
#   data_Vin=[]
#   data_Vin.append(in_V0)
#   ###NUMBER OF INSTANCES
#   N=2
#   pulse=ret.create_pulse()
#   #CREATE INSTANCES
#   groupN=[]
#   for i in xrange(N):
#       g=group(in_V0,tcr_V0,trn_V0)
#       groupN.append(g)
#
#   for i in t[1:]:
#       p=pulse()
#       proc=[]
#       for j in xrange(N):
#           pr=Process(name="group_"+str(j),target=groupN[j].step, args=(p,))
#           pr.start()
#           proc.append(pr)
#       for j in xrange(N):
#           proc[j].join(N)
#
#       data_Vtcr.append((groupN[0].TCR.shared.V+groupN[1].TCR.shared.V)*0.5)     #write to output list
#       data_Vtrn.append((groupN[0].TRN.shared.V+groupN[1].TRN.shared.V)*0.5)
#       data_Vin.append((groupN[0].IN.shared.V+groupN[1].IN.shared.V)*0.5)
#############FOR LOOPING INSIDE INSTANCE ---FASTER#############################################
    #CREATE p vector of retinal pulses
    p=[]
    pulse=ret.create_pulse()
    for k in xrange(len(t)-1):
        p.append(pulse())   
    #CREATE INSTANCES
    N=2
    groupN=[]
    proc=[]

    manager=mp.Manager() #creating a shared namespace

    data_Vtcr_0 = manager.list()
    data_Vtrn_0 = manager.list()
    data_Vin_0  = manager.list()

    data_Vtcr_1 = manager.list()
    data_Vtrn_1 = manager.list()
    data_Vin_1  = manager.list()

    data_Vtcr=[data_Vtcr_0, data_Vtcr_1]
    data_Vtrn=[data_Vtrn_0, data_Vtrn_1]
    data_Vin=[data_Vin_0, data_Vin_1]

    for j in xrange(N):
        g=group(tcr_V0,trn_V0,in_V0)
        groupN.append(g)
    for j in xrange(N):
        pr=Process(name="group_"+str(j),target=groupN[j].stepN, args=(p, len(t)-1, data_Vtcr[j], data_Vtrn[j], data_Vin[j],))
        pr.start()
        proc.append(pr)
    for j in xrange(N):
        proc[j].join()
    data_Vtcr_av=[0.5*i for i in map(add, data_Vtcr[0], data_Vtcr[1])]
    data_Vtrn_av=[0.5*i for i in map(add, data_Vtrn[0], data_Vtrn[1])]
    data_Vin_av =[0.5*i for i in map(add, data_Vin[0],  data_Vin[1])]

    print len(t), len(data_Vtcr[0]), len(data_Vtcr_av)
    ##Plotting#####################################
    subplot(3,1,1)
    xlabel('t')
    ylabel('tcr - mV')
    plot(t[50*sec_steps:],array(data_Vtcr_av)[50*sec_steps:], color='black')

    subplot(3,1,2)
    xlabel('t')
    ylabel('trn - mV')
    plot(t[50*sec_steps:],array(data_Vtrn_av)[50*sec_steps:], color='magenta')

    subplot(3,1,3)
    xlabel('t')
    ylabel('IN - mV')
    plot(t[50*sec_steps:],array(data_Vin_av)[50*sec_steps:], color='red')

    #savefig('./v_tcr.eps', format='eps', dpi=1000)
    ###############################################

    t_1=time() #measure elapsed time
    print "elapsed time: ", t_1-t_0, " seconds."
    #save data to file
    FILE=open("./output.dat","w")
    FILE.write("########################\n")
    FILE.write("# t                                                           V       #\n")
    FILE.write("########################\n")
    for k in range(len(t)):
        FILE.write(str(t[k]).zfill(5)+"\t"*3+repr(data_Vtcr_av[k])+"\n")
    FILE.close()
    #################
    show()
    return t,array(data_Vtcr)

######################################################################################################################
######################################################################################################################
if __name__ == "__main__":
    run(60)    #run simulation for 60 seconds
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 9

你的问题是你过分依赖multiprocessing.Manager Proxy物体进行数学计算.multiprocessing.Manager我回答你原来的问题时,我试图警告你这个缺点,但我的措辞不够强.我这样说:

请记住,multiprocessing.Manager启动子进程来管理您创建的所有共享实例,并且每次访问其中一个Proxy实例时,您实际上都在对该Manager进程进行IPC调用.

我应该补充说:"IPC调用比同一进程中的普通访问贵得多".您的原始问题并未真正表明您将使用这些Manager实例有多广泛,因此我没有想到要强调它.

考虑这个简单的例子,它只是从循环中的一个 Proxy变量读取:

>>> timeit.timeit("for _ in range(1000): x = v + 2", setup="v = 0", number=1000)
0.040110111236572266
>>> timeit.timeit("for _ in range(1000): x = shared.v + 2", 
                  setup="import multiprocessing ; m = multiprocessing.Manager() ; shared = m.Namespace(); shared.v = 0", 
                  number=1000)
15.048354864120483
Run Code Online (Sandbox Code Playgroud)

引入共享变量时,速度几乎快400倍.现在,这个例子有点极端,因为我们正在紧密循环中访问共享变量,但重点是; 访问Proxy变量很.你在你的计划中做了很多.访问的额外开销Proxy比通过同时运行两个进程获得的成本高得多.

您将需要显着重构此代码以将Proxy变量的使用保持在绝对最小值.您可能会发现更多的成功取代了大部分的用法multiprocessing.Namespacemultiprocessing.Value,这是存储在共享内存,而不是一个单独的进程.这使得它们更快(尽管仍比常规变量慢得多):

>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0)", number=1000)
0.29022717475891113
Run Code Online (Sandbox Code Playgroud)

如果你用lock=False以下内容初始化它会变得更快:

>>> timeit.timeit("for _ in range(1000): x = v.value + 2", setup="import multiprocessing ; v = multiprocessing.Value('i', 0, lock=False)", number=1000)
0.06386399269104004
Run Code Online (Sandbox Code Playgroud)

但那时Value不再是自动过程安全的.multiprocessing.Lock如果它们可能同时在两个进程中发生变化,您将需要显式创建并采用同步对变量的访问.

唯一的另一个限制multiprocessing.Value是您仅限于模块支持的类型ctypesarray模块.这实际上应该对你来说很好,因为你主要使用整数和浮点数.作为Proxy实例,您可能需要保留的唯一部分是列表,尽管您也可以使用a multiprocessing.Array.