如何在R中异步查询多个数据库

dav*_*uto 5 postgresql asynchronous r

我目前正在使用存储连接列表和 for 循环查询多个数据库,但我想通过异步查询所有数据库来加快进程

我在网上阅读了有关承诺功能的信息,但它没有按预期工作。

for(connection in databases)
{
    temp <- data.table(dbGetQuery(connection, "super secret sql query"))
    results <- rbind(results, temp)
    dbDisconnect(connection)
}

results$sum <- as.numeric(results$sum)
return(results)}
Run Code Online (Sandbox Code Playgroud)

我想将此 for 循环更改为单个执行语句,以在多个数据库中发出相同的查询并返回结果。

r2e*_*ans 4

因为future尝试自动将变量传输到节点中,并且不会使用外部指针(包括数据库连接对象)传输变量,所以您需要指定一个包装器来为您执行此操作。这是一个建议,尚未经过测试,但可以为您提供一个开始。

更新:我认为陈旧的连接最好在驱动程序级别完成,所以我建议使用pool. (如果您知道如何在没有 的情况下最好地了解连接是否未过期tryCatch(DBI::dbGetQuery(...), error=function(e) "expired"),那么我会洗耳恭听……大多数 ODBC 驱动程序及其odbc本身对“有效连接”在这种情况下的含义都有短视的看法。)

cred <- list(drv = odbc::odbc(), server = "server.address", user = "me", password = "secret")
mydb <- function(cred) {
  library(DBI)
  library(odbc)
  library(pool)
  if (exists(".cred") && !is.null(.cred) && !identical(.cred, cred)) {
    if (exists(".pool") && !is.null(.pool)) {
      pool::poolClose(.pool)
      .pool <<- NULL
    }
    .cred <<- NULL
  }
  if (!exists(".pool") || is.null(.pool)) {
    .pool <<- do.call(pool::dbPool, cred)
    .cred <<- cred
  }
  conn <- pool::poolCheckout(.pool)
  # hack to always return the pool object, don't "leak" it
  do.call(on.exit, list(substitute(suppressWarnings(pool::poolReturn(conn)))),
          envir = parent.frame())
  conn
}
Run Code Online (Sandbox Code Playgroud)

它做出了一个有点草率的决定,将每个节点的全局环境中的可行连接(及其凭据)存储在一个点变量中,该变量旨在不与其他任何内容发生冲突。cred应该可以很好地传输到节点,因为它只是一个list. mydb(cred)如果不存在,将创建一个新连接;如果存在并且具有相同的凭据,则传递旧连接;如果凭据因某种原因发生更改,则删除旧连接并创建一个新连接。

概念验证:

library(DBI)
library(odbc)
library(pool)
library(future)
library(future.apply) # only required for this demo, future_lapply
cl <- parallel::makeCluster(3)
plan(cluster, workers=cl)
cred <- list(driver = odbc::odbc(), server = "sqlserver.ip.address", user = "me", password = "secret")

DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
#   R_pid
# 1  7500
DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
#   R_pid
# 1  7500

### single future call
a %<-% DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
a
#   R_pid
# 1  9732

### multiple future calls
future_lapply(1:4, function(ign) DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid")))
# [[1]]
#   R_pid
# 1  9732
# [[2]]
#   R_pid
# 1  6132
# [[3]]
#   R_pid
# 1  6132
# [[4]]
#   R_pid
# 1  8480
Run Code Online (Sandbox Code Playgroud)

虽然尝试是为了不泄漏数据库对象,但出于某种原因,我仍然收到有关泄漏对象的警告......这表明我的幻想on.exit(..., envir=parent.frame())并没有做我希望做的一切。我认为这个警告相对温和,但它确实表明连接管理有些草率。

我对加载包有点明确,因为否则我会看到以下形式的错误:

# Error in (function (classes, fdef, mtable)  : 
#   unable to find an inherited method for function 'dbGetQuery' for signature '"Microsoft SQL Server", "character"'
Run Code Online (Sandbox Code Playgroud)

虽然我正在针对 mssql 进行测试,而您正在使用 postgresql,但我认为问题对此无关紧要。通过对包的显式控制和代码中的一些其他细微差别可以解决这个问题。