Ant*_*uve 8 parallel-processing julia
首先我的问题:
现在的细节:
我有这个程序:
data = DataFrames.readtable("...") # a big baby (~100MB)
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
filtered_data = @parallel vcat for fct in filter_functions
fct(data)::DataFrame
end
Run Code Online (Sandbox Code Playgroud)
它的功能很好,但是对另一个worker的fct(data)的每次并行调用都会复制整个数据框,使得一切都变得非常缓慢.
理想情况下,我想加载一次数据,并始终在每个工作程序上使用预先加载的数据.我想出了这样的代码:
@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
@everywhere for i in 1:length(filter_functions)
if (myid()-1) % nworkers()
fct = filter_functions[i]
filtered_data_temp = fct(data)
end
# How to vcat all the filtered_data_temp ?
end
Run Code Online (Sandbox Code Playgroud)
但是现在我还有另外一个问题:我无法弄清楚如何使用myid()== 1将所有filtered_data_temp vcat()放到worker中的变量上.
我非常感谢任何见解.
注意:我知道在Julia中的大型常量数据结构上并行操作.然而,我不相信它适用于我的问题,因为我的所有filter_functions都作为一个整体在数组上运行.
Fel*_*ema 10
您可能希望查看/加载数据到分布式阵列中
编辑:可能是这样的:
data = DataFrames.readtable("...")
dfiltered_data = distribute(data) #distributes data among processes automagically
filter_functions = [ fct1, fct2, fct3 ... ]
for fct in filter_functions
dfiltered_data = fct(dfiltered_data)::DataFrame
end
Run Code Online (Sandbox Code Playgroud)
您还可以查看单元测试以获取更多示例
毕竟,我在那里找到了我的问题的解决方案:Julia:如何将数据复制到 Julia 中的另一个处理器。
特别是,它引入了以下原语以便从另一个进程检索变量:
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
Run Code Online (Sandbox Code Playgroud)
下面是我的使用方法:
@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
# Executes the filter functions
@everywhere for i in 1:length(filter_functions)
local_results = ... # some type
if (myid()-1) % nworkers()
fct = filter_functions[i]
filtered_data_temp = fct(data)
local_results = vcat(local_results, filtered_data_temp)
end
# How to vcat all the filtered_data_temp ?
end
# Concatenate all the local results
all_results = ... # some type
for wid in 1:workers()
worker_local_results = getfrom(wid, :local_results)
all_results = vcat(all_results,worker_local_results)
end
Run Code Online (Sandbox Code Playgroud)