Julia:使用带有Arrays和SharedArrays的pmap

Lan*_*don 7 parallel-processing matrix pmap julia

我已经在Julia工作了几个月了,我有兴趣并行写一些代码.我正在研究一个问题,我使用1个模型为几个不同的接收器生成数据(每个接收器的数据是一个向量).每个接收器的数据可以独立计算,这使我相信我应该能够使用pmap函数.我的计划是将数据初始化为2D SharedArray(每列代表1个接收器的数据),然后在每个列上进行pmap循环.但是我发现将SharedArray与pmap一起使用并不比使用map的串行工作快.我写了下面的虚拟代码来说明这一点.

@everywhere function Dummy(icol,model,data,A,B)
    nx = 250
    nz = 250
    nh = 50
    for ih = 1:nh
        for ix = 1:nx
            for iz = 1:nz
                data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
            end
        end
    end
end


function main()

    nx = 250
    nz = 250
    nh = 50

    nt = 500
    ncol = 100

    model1 = rand(nz,nx,nh)
    model2 = copy(model1)
    model3 = convert(SharedArray,model1)

    data1 = zeros(Float64,nt,ncol)
    data2 = SharedArray(Float64,nt,ncol)
    data3 = SharedArray(Float64,nt,ncol)

    A1 = rand(nz,nx,nh)
    A2 = copy(A1)
    A3 = convert(SharedArray,A1)

    B1 = rand(nz,nx,nh)
    B2 = copy(B1)
    B3 = convert(SharedArray,B1)


    @time map((arg)->Dummy(arg,model1,data1,A1,B1),[icol for icol = 1:ncol])
    @time pmap((arg)->Dummy(arg,model2,data2,A2,B2),[icol for icol = 1:ncol])
    @time pmap((arg)->Dummy(arg,model3,data3,A3,B3),[icol for icol = 1:ncol])

    println(data1==data2)
    println(data1==data3)

end

main() 
Run Code Online (Sandbox Code Playgroud)

我启动Julia会话Julia -p 3并运行脚本.3次测试的时间分别为1.4s,4.7s和1.6s.与带有map(1.4s)的常规数组相比,使用带有pmap的SharedArrays(1.6s运行时)并没有提供任何速度上的改进.我也很困惑为什么第二种情况(数据作为SharedArray,所有其他输入作为带有pmap的常规数组)是如此之慢.为了从并行工作中获益,我需要改变什么?

Mic*_*gge 7

前言:是的,实际上有一个问题的解决方案.请参阅底部的代码.但是,在我到达那里之前,我会进行一些解释.

我认为这里问题的根源是内存访问.首先,虽然我没有对它进行严格的调查,但我怀疑可以对Julia的底层代码进行适度的改进,以改进它在并行处理中处理内存访问的方式.然而,在这种情况下,我怀疑基本代码的任何潜在问题,如果实际存在的话,并没有那么多错.相反,我认为仔细考虑代码中究竟发生了什么以及它对内存访问的意义是有用的.

  1. 在Julia中工作时要记住的一个关键事项是它以列主要顺序存储数组.也就是说,它将它们作为列堆叠存储在彼此之上.这也推广到> 2的维度.请参见的朱莉娅性能提示非常有用段的更多信息.这意味着在单个列中快速访问一行接一行.但是,如果你需要跳过列,那么你就会遇到麻烦.是的,访问ram内存可能相对较快,但访问缓存内存要快得多,所以如果你的代码允许将一个列从ram加载到缓存然后继续工作,那么你会做很多事情比你需要在ram和缓存之间进行大量交换更好.在您的代码中,您在计算之间从列到列切换,就像没有人的业务一样.例如,在pmap每个进程中获取共享数组的不同列以进行处理.然后,每个都沿着该列的行向下并修改其中的值.但是,由于它们正在尝试彼此并行工作,并且整个数组太大而无法容纳到缓存中,因此ram和缓存之间会发生大量交换,这会让您感到沮丧.从理论上讲,也许可以设计一个足够聪明的引擎盖内存管理系统来解决这个问题,但我真的不知道 - 这超出了我的工资等级.当然,您访问其他对象时也会发生同样的事情.

  2. 在并行化时,要记住的另一件事是你的触发器(即计算机计算)与读/写操作的比率.触发器往往很好地并行化,你可以拥有不同的内核,进程等,对自己在微小缓存中保存的数据位进行自己的小计算.但是,读/写操作不能很好地并行化.可以通过一些方法来设计硬件系统以改进这一点.但总的来说,如果你有一个具有两个核心的给定计算机系统,并且你再添加四个核心,那么你执行翻牌的能力会增加三倍,但是你能够从ram读取/写入数据的能力赢了"真的改善了这么多.(注意:这是一个过度紧张,很大程度上取决于你的系统).然而,一般来说,触发器与读/写的比率越高,并行性就越有利.在您的情况下,您的代码涉及相当少的读取/写入(所有这些访问到您的不同阵列)的相对少量的触发器(一些多重插图和一个附加).这只是要记住的事情.

  3. 幸运的是,如果正确写入,你的情况可以从并行性中获得一些好的加速.根据我对Julia的经验,当我打破数据并让工作人员分别处理数据时,我所有最成功的并行性就出现了.你的情况恰好适合那种情况.下面是我写的一些代码的例子.你可以看到它从一个处理器到三个处理器的速度提高了近3倍.代码有点粗糙,但它至少证明了如何处理这样的事情的一般想法.之后我对代码发表了一些评论.

addprocs(3)

nx = 250;
nz = 250;
nh = 50;
nt = 250;
@everywhere ncol = 100;

model = rand(nz,nx,nh);

data = SharedArray(Float64,nt,ncol);

A = rand(nz,nx,nh);

B = rand(nz,nx,nh);

function distribute_data(X, obj_name_on_worker::Symbol, dim)
    size_per_worker = floor(Int,size(X,1) / nworkers())
    StartIdx = 1
    EndIdx = size_per_worker
    for (idx, pid) in enumerate(workers())
        if idx == nworkers()
            EndIdx = size(X,1)
        end
        println(StartIdx:EndIdx)
        if dim == 3
            @spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:,:])))
        elseif dim == 2
            @spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:])))
        end
        StartIdx = EndIdx + 1
        EndIdx = EndIdx + size_per_worker - 1
    end
end

distribute_data(model, :model, 3)
distribute_data(A, :A, 3)
distribute_data(B, :B, 3)
distribute_data(data, :data, 2)

@everywhere function Dummy(icol,model,data,A,B)
    nx = size(model, 2)
    nz = size(A,1)
    nh = size(model, 3)
    for ih = 1:nh
        for ix = 1:nx
            for iz = 1:nz
                data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
            end
        end
    end
end

regular_test() = map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])

function parallel_test()
    @everywhere begin
        if myid() != 1
            map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
        end
    end
end

@time regular_test(); # 2.120631 seconds (307 allocations: 11.313 KB)
@time parallel_test(); # 0.918850 seconds (5.70 k allocations: 337.250 KB)

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
function recombine_data(Data::Symbol)
    Results = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        Results[idx] = getfrom(pid, Data)
    end
    return vcat(Results...)
end

@time P_Data = recombine_data(:data); # 0.003132 seconds

P_Data == data  ## true
Run Code Online (Sandbox Code Playgroud)

评论

  • 在这里使用它SharedArray是相当多余的.我只是使用它,因为它很容易在适当的位置进行修改,这就是你的代码最初编写的方式.这让我可以根据你所写的内容更直接地工作而不必修改它.

  • 我没有在计时器中包含将数据恢复的步骤,但正如您所看到的,在这种情况下,这是非常微不足道的时间.在其他情况下,它可能不那么简单,但数据移动只是您在并行性方面遇到的问题之一.

  • 在进行一般的时间试验时,最好的做法是运行一次函数(为了编译代码),然后再次运行它以获取时间.这就是我在这里所做的.

  • 看到这篇SO 帖子,我在这里获得了一些这些函数的灵感.