Lut*_*ett 6 parallel-processing multithreading r rate-limiting furrr
我必须从 Web API (NCBI entrez) 检索大型数据集,该数据集将我每秒的请求数限制为一定数量,例如 10 个(示例代码将在没有 API 密钥的情况下将您限制为 3 个)。我使用 Furrr 的 future_* 函数来并行化请求以尽快获取它们,如下所示:
library(tidyverse)
library(rentrez)
library(furrr)
plan(multiprocess)
api_key <- "<api key>"
# this will return a crap-ton of results
srch <- entrez_search("nuccore", "Homo sapiens", use_history=T, api_key=api_key)
total <- srch$count
per_request <- 500 # get 500 records per parallel request
nrequest <- total %/% per_request + as.logical(total %% per_request)
result <- future_map(seq(nrequest),function(x) {
rstart <- (x - 1) * per_request
return(entrez_fetch(
"nuccore",
web_history = srch$web_history,
rettype="fasta",
retmode="xml",
retstart=rstart,
retmax=per_request,
api_key=api_key
))
}
Run Code Online (Sandbox Code Playgroud)
显然,对于nrequest > 10(或无论限制是什么)的情况,我们将立即遇到速率限制。
我看到了两个看似明显简单的解决方案,两者似乎都有效。一种是在发出请求之前引入随机的短暂延迟,如下所示:
future_map(seq(nrequest),function(x) {
Sys.sleep(runif(1,0,5))
# ...do the request...
}
Run Code Online (Sandbox Code Playgroud)
plan(multiprocess,workers=<max_concurrent_requests>)第二种是通过使用semaphore包将并发请求数限制在速率限制内,并将信号量设置为速率限制,如下所示:
# this sort of assumes individual requests take long enough to cause
# a wait for the semaphore to be long enough
# for this case, they do
rate_limit <- 10
lock = semaphore(rate_limit)
result <- future_map(seq(nrequest),function(x) {
rstart <- (x - 1) * per_request
acquire(lock)
s <- entrez_fetch(
"nuccore",
web_history = srch$web_history,
rettype="fasta",
retmode="xml",
retstart=rstart,
retmax=per_request,
api_key=api_key
)
release(lock)
return(s)
}
Run Code Online (Sandbox Code Playgroud)
然而,我真正希望能够做的是限制请求速率而不是并发请求的数量。Quentin Pradet有一篇很棒的文章,介绍了如何在 python 中使用异步 io http 请求来做到这一点。我尝试将此应用于 R,但遇到了一个问题,即 future_* 函数中跨线程/进程共享的任何变量都是复制而不是实际共享,因此修改(即使受信号量锁保护)不会在之间共享线程/进程,因此不可能实现我们依赖于此方法工作的计数器存储桶!
是否有一种巧妙的方法来限制并行请求的速率,而不必限制同时请求的数量?还是我想得太多了,应该坚持限制数量?
| 归档时间: |
|
| 查看次数: |
744 次 |
| 最近记录: |