在执行过程中停止 dplyr/tidyr 链并保存计算进度

Jef*_*ker 3 r dplyr tidyr tidyverse

我有一个编写的自定义函数,需要一段时间才能在大型数据集上运行,有时会停止运行。我的函数是一个窗口函数(例如cumsum)。如果我停止执行,所有数据都会丢失。有没有办法在数据传输过程中tidyr保存dplyr数据来避免这种情况?

我的数据采用宽格式,并且我在组(例如产品)和许多变量(例如指标)上运行该函数。

   Product Year           a           b           c           d
1        A 2012 -0.54884514 -0.15416417  0.54861146  1.04147041
2        A 2013  1.22642587  1.43655028 -0.71433978  0.23523411
3        A 2014 -1.49161792  0.53356645  0.44964089 -0.01657906
4        A 2015 -0.72283864 -0.30601369 -0.04536668 -1.24809562
5        A 2016  0.41150740  1.42205301  0.59239525  1.82255169
6        B 2012  0.07279991  1.87163670  1.45773252 -1.93302885
7        B 2013  1.02705536 -2.70856122  0.57013708  1.35345098
8        B 2014  1.35513596  0.05818042 -0.41595725 -2.07142883
9        B 2015  0.40750419  0.13024750 -0.89163416  0.44227276
10       B 2016  0.25391609  0.02908517 -1.62128177  1.83811852
11       C 2012 -0.70568556  0.37254186 -0.61830412 -1.61228981
12       C 2013 -0.97811352  0.73741264 -0.60743864  0.12820628
13       C 2014 -0.20605945 -1.26239900 -0.21926510 -0.29185710
14       C 2015 -1.07297893  2.17374995 -0.29045520 -0.15203030
15       C 2016 -1.51221585  0.87294266  0.26420813 -0.70152124
16       D 2012  0.44717558  0.07587063  0.62215522  0.76882890
17       D 2013 -1.71815014  2.60236385  0.14437641 -0.60752707
18       D 2014  0.50659673 -0.57601702  0.09140279 -1.18971359
19       D 2015 -1.27493812 -0.76221085  0.58623989  0.37937413
20       D 2016  2.03280890 -0.39427715  0.29775332  0.88033461
Run Code Online (Sandbox Code Playgroud)

如果我使用 tidy 方法,我可以只获取gather数据,然后group_by. 这可行,但我无法在不丢失所有进度的情况下停止执行中途。

# The tidy way
dt2 <- dt %>%
  gather(Metric,Value,3:6) %>%
  group_by(Product,Metric) %>%
  mutate(Metric2 = paste0(Metric,2),
         Value2 = cumsum(Value)) %>%
  ungroup() %>%
  select(-Value, -Metric) %>% # I would love to leave the original metric in if possible
  spread(Metric2,Value2)
Run Code Online (Sandbox Code Playgroud)

如果我不使用 tidy 方法,我可以随时停止执行并保存该点的结果。

# The non-tidy way
dt2 <- tibble()
#pb = txtProgressBar(min = 0, max = 4, initial = 0, style = 3)
for(i in 1:4) {
  single_product <- dt[which(dt$Product == unique(dt$Product)[i]),]
  for(j in 3:6) {
    single_metric <- single_product[,c(1:2,j)]
    single_metric[,paste0(colnames(single_metric[3]),2)] <- cumsum(single_metric[3])
    single_product <- left_join(single_product,single_metric)
  }
  dt2 <- bind_rows(dt2,single_product)
  #setTxtProgressBar(pb,i)
}
Run Code Online (Sandbox Code Playgroud)

如果我们可以添加进度条的话,会得到奖励积分。这是虚拟数据:

# The data
dt <- expand.grid(Product=LETTERS[1:4], Metric = letters[1:4], Year = 2012:2016)
dt$Value <- rnorm(nrow(dt))
dt <- dt %>%
  spread(Metric, Value)
Run Code Online (Sandbox Code Playgroud)

Rya*_*son 5

我能想到的保存进度的最简单方法是使用缓存。在下面的代码中,memoize_fun采用一个用于计算值的函数 ( )value_fun和一个用于计算该值的键 ( key_fun) 的函数。在本例中,键是 Product,值是要计算的完整数据帧该产品。我添加了消息来显示缓存何时被填充和使用。请注意,如果该do语句花费的时间超过几秒钟,dplyr 应该自动添加一个进度条。您应该在第一次运行时看到这一点,其中使用调用来人为地夸大运行时Sys.sleep

library(dplyr)
library(tidyr)
library(magrittr)
dt <- expand.grid(Product=LETTERS, Metric = letters[1:4], Year = 2012:2016)
dt$Value <- rnorm(nrow(dt))
dt <- dt %>%
  spread(Metric, Value)


my_cache <- list()
memoize_fun <- function(value_fun,  key_fun) {
    function(...) {
        key <- as.character(key_fun(...))
        message("Using key", deparse(key))
        assert_that(is.character(key))
        assert_that(length(key) == 1)
        if (! key %in% names(my_cache)) {
            message("Computing value for ", deparse(key))
            my_cache[[key]] <<- value_fun(...)
            Sys.sleep(1)
        } else {
            message("Re-using stored value for ", deparse(key))
        }
        return (my_cache[[key]])
    }
}

metrics <- colnames(dt)[3:6]

system.time({
    dt2 <- dt %>%
        group_by(Product) %>%
        do({
            value_fun <- . %>% cbind(., CumSum=transmute_all(.[metrics], cumsum))
            key_fun <- . %>% .$Product %>% .[1]
            memoize_fun(value_fun, key_fun)(.)
        })
})

## Run the same thing again to demonstrate that everything is cached
system.time({
    dt2 <- dt %>%
        group_by(Product) %>%
        do({
            value_fun <- . %>% cbind(., CumSum=transmute_all(.[metrics], cumsum))
            key_fun <- . %>% .$Product %>% .[1]
            memoize_fun(value_fun, key_fun)(.)
        })
})
Run Code Online (Sandbox Code Playgroud)

我们还可以证明,通过为每次计算添加 50% 的失败机会,然后将其包装在不断重试直到结束的代码中,这可以在出现随机错误的情况下重新启动:

my_cache <- list() # Reset the cache
finished <- FALSE
tries <- 1
while (! finished) {
    message("Attempt number ", tries)
    tryCatch({
        dt2 <- dt %>%
            group_by(Product) %>%
            do({
                value_fun <- . %>% cbind(., CumSum=transmute_all(.[metrics], cumsum)) %T>%
                    { if (runif(1) > 0.5) stop("Random error")}
                key_fun <- . %>% .$Product %>% .[1]
                memoize_fun(value_fun, key_fun)(.)
            })
        finished <- TRUE
    },
    error=function(...) NULL)
    tries <- tries + 1
}
Run Code Online (Sandbox Code Playgroud)