并行计算,dplyr 中 tidyr::complete 的替代方法是什么?

MCS*_*MCS 12 parallel-processing r dplyr multidplyr

我正在尝试并行化管道。在管道中有一个 tidyr 命令(“tidyr::complete”)。一旦并行运行,这就会分解代码,因为无法识别对象类。

dplyr 中是否有替代方法可以完成?

library(dplyr)
library(tidyr)
library(zoo)


test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
               var_1=c(1,1,1,1,1,1,2,2,2), 
               var_2=c(1,1,1,1,1,2,3,3,3), 
               var_3=c(0,5,NA,15,20,NA,1,NA,NA))

max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)

Run Code Online (Sandbox Code Playgroud)

串行


test_serial <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1, year = seq(min_year,max_year)) %>%
  mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))


Run Code Online (Sandbox Code Playgroud)

并行(失败)

devtools::install_github("hadley/multidplyr")
library(multidplyr)

cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))

test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>% 
  dplyr::group_by(var_1,var_2) %>% 
  tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
  dplyr::mutate(
    var_3 = na.approx(var_3,na.rm = FALSE),
    var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>% 
  collect()

Run Code Online (Sandbox Code Playgroud)

这是错误信息

Error in UseMethod("complete_") : 
  no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"
Run Code Online (Sandbox Code Playgroud)

Wal*_*ldi 9

Multidplyr 允许您:

  1. 使用拆分数据 partition()
  2. 在专用节点上处理每个分区
  3. collect() 结果

并非所有数据处理任务都适合之前的工作流程。

特别是,complete需要知道输入数据所有可能的值才能创建缺失的行,这意味着这个操作作为一个整体是无法拆分的,这就是为什么没有适用的方法可用。

在您提供的示例中,每个节点将接收一var_1, var_2对,而无需知道其他节点得到了什么,这不允许并行实现预期结果。

但是,正如您已经知道的那样year = seq(min_year,max_year),您可以complete仅将此变量的任务并行化,将任务拆分为var_1,例如使用furrr包:

library(furrr)
plan(multiprocess)
test_parallel <- test %>% 
  group_by(var_1,var_2) %>% 
  complete(var_1) %>% split(.$var_1) %>% 
  furrr::future_map(~{
    complete(.x, year = seq(min_year,max_year)) %>%
    dplyr::mutate(
        var_3 = na.approx(var_3,na.rm = FALSE),
        var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) 
    }) %>% bind_rows()

> identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
+           c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
[1] TRUE
Run Code Online (Sandbox Code Playgroud)

在更大的数据集上进行测试以衡量潜在的性能改进。

  • 在小型示例数据集上,多任务处理大多数时候会比较慢,因为您需要打开任务、将数据传输给它们并收集结果,这对于 15 行来说比直接处理数据需要更多的时间,请参阅我的最后一句话;- ) (4认同)