ale*_*ing 4 parallel-processing foreach domc rsqlite
我有一个大型数据库(~100Gb),我需要从中提取每个条目,对其执行一些比较,然后存储这些比较的结果。我尝试在单个 R 会话中运行并行查询,但没有成功。我可以同时运行多个 R 会话,但我正在寻找更好的方法。这是我尝试的:
library(RSQLite)
library(data.table)
library(foreach)
library(doMC)
#---------
# SETUP
#---------
#connect to db
db <- dbConnect(SQLite(), dbname="genes_drug_combos.sqlite")
#---------
# QUERY
#---------
# 856086 combos = 1309 * 109 * 6
registerDoMC(8)
#I would run 6 seperate R sessions (one for each i)
res_list <- foreach(i=1:6) %dopar% {
a <- i*109-108
b <- i*109
pb <- txtProgressBar(min=a, max=b, style=3)
res <- list()
for (j in a:b) {
#get preds for drug combos
statement <- paste("SELECT * from combo_tstats WHERE rowid BETWEEN", (j*1309)-1308, "AND", j*1309)
combo_preds <- dbGetQuery(db, statement)
#here I do some stuff to the result returned from the query
combo_names <- combo_preds$drug_combo
combo_preds <- as.data.frame(t(combo_preds[,-1]))
colnames(combo_preds) <- combo_names
#get top drug combos
top_combos <- get_top_drugs(query_genes, drug_info=combo_preds, es=T)
#update progress and store result
setTxtProgressBar(pb, j)
res[[ length(res)+1 ]] <- top_combos
}
#bind results together
res <- rbindlist(res)
}
Run Code Online (Sandbox Code Playgroud)
我没有收到任何错误,但只有一个核心旋转。相反,如果我运行多个 R 会话,我的所有核心都会执行它。我究竟做错了什么?
RSQLite我在同时访问同一个文件 SQLite 数据库时学到的一些东西:
parallel::clusterEvalQ(cl = cl, {
db.conn <- RSQLite::dbConnect(RSQLite::SQLite(), "./export/models.sqlite");
RSQLite::dbClearResult(RSQLite::dbSendQuery(db.conn, "PRAGMA busy_timeout=5000;"));
})
Run Code Online (Sandbox Code Playgroud)
PRAGMA busy_timeout=5000;默认情况下,此值设置为 0,并且每次您的工作线程尝试在锁定的数据库中写入数据时,您很可能会遇到“数据库已锁定”错误。前面的代码PRAGMA在每个工作连接中设置了这个。请注意,SELECT操作永远不会被锁定,而只会被锁定INSERT/DELETE/UPDATE。
PRAGMA journal_mode=WAL;只需设置一次,默认情况下将永远保持开启状态。它将向数据库添加两个(或多或少是永久性的)文件。它将提高并发读/写性能。在这里阅读更多内容。
通过上述设置,我还没有遇到这个问题。