Julia中大数据的并行计算

Ant*_*uve 8 parallel-processing julia

首先我的问题:

  • 是否有可能阻止Julia每次在并行for循环中复制变量?
  • 如果没有,如何在Julia中实现并行减少操作?

现在的细节:

我有这个程序:

data = DataFrames.readtable("...") # a big baby (~100MB)
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
filtered_data = @parallel vcat for fct in filter_functions
  fct(data)::DataFrame
end
Run Code Online (Sandbox Code Playgroud)

它的功能很好,但是对另一个worker的fct(data)的每次并行调用都会复制整个数据框,使得一切都变得非常缓慢.

理想情况下,我想加载一次数据,并始终在每个工作程序上使用预先加载的数据.我想出了这样的代码:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
@everywhere for i in 1:length(filter_functions)
  if (myid()-1) % nworkers()
    fct = filter_functions[i]
    filtered_data_temp = fct(data)
  end
  # How to vcat all the filtered_data_temp ?
end
Run Code Online (Sandbox Code Playgroud)

但是现在我还有另外一个问题:我无法弄清楚如何使用myid()== 1将所有filtered_data_temp vcat()放到worker中的变量上.

我非常感谢任何见解.

注意:我知道在Julia中的大型常量数据结构上并行操作.然而,我不相信它适用于我的问题,因为我的所有filter_functions都作为一个整体在数组上运行.

Fel*_*ema 10

您可能希望查看/加载数据到分布式阵列中

编辑:可能是这样的:

data = DataFrames.readtable("...")
dfiltered_data = distribute(data) #distributes data among processes automagically
filter_functions = [ fct1, fct2, fct3 ... ] 
for fct in filter_functions
  dfiltered_data = fct(dfiltered_data)::DataFrame
end
Run Code Online (Sandbox Code Playgroud)

您还可以查看单元测试以获取更多示例

  • 您可能还想考虑`SharedArray`s,如果您的所有数据都在一个进程上启动,并且您不想为将它们移动到另一个进程付出代价. (2认同)

Ant*_*uve 4

毕竟,我在那里找到了我的问题的解决方案:Julia:如何将数据复制到 Julia 中的另一个处理器

特别是,它引入了以下原语以便从另一个进程检索变量:

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
Run Code Online (Sandbox Code Playgroud)

下面是我的使用方法:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
# Executes the filter functions
@everywhere for i in 1:length(filter_functions)
  local_results = ... # some type
  if (myid()-1) % nworkers()
    fct = filter_functions[i]
    filtered_data_temp = fct(data)
    local_results = vcat(local_results, filtered_data_temp)
  end
  # How to vcat all the filtered_data_temp ?
end
# Concatenate all the local results
all_results = ... # some type
for wid in 1:workers()
  worker_local_results = getfrom(wid, :local_results)
  all_results = vcat(all_results,worker_local_results)
end
Run Code Online (Sandbox Code Playgroud)