Cin*_*ker 2 r bigdata rsqlite data.table
我有 40 多个 CSV 文件,每个文件大约 400MB。我需要做的是读取这 40 多个大 csv 文件,对它们进行一些操作和格式化(例如通用日期格式、将日期分隔为月、日等),并将它们组合在一个数据框中。我在上一篇文章中搜索过将这些 CSV 文件读取为“fread”的最快方法,但即使我使用 fread,也需要大约 10 分钟。读取每个文件需要 14 秒,这给我留下了相当可观的运行时间。我尝试通过 RSQLite 使用 SQLite 来处理单个 csv 文件:
setwd("raw_data/sqldatabase")
db <- dbConnect(SQLite(), dbname="test_db.sqlite") ## will make, if not present
dbWriteTable(conn=db, name="your_table", value="testdata.csv", row.names=FALSE, header=TRUE)
Run Code Online (Sandbox Code Playgroud)
然而,即使使用 SQLite 也需要相当长的时间。有什么办法可以快速将40多个大型csv文件夹读取到一个“空间”中,使得操作速度非常快?
如果我将数据上传到数据库一次,并且如果它能够使操作变得非常快,我仍然没问题,但最终文件夹(合并完成后)预计为 25+GB。所以我试图找到最有效的方法来操纵数据
一种替代方案可能是“镶木地板数据集市”。这里的前提是:
.parquet。
.parquet可选:拆分一个或多个可索引(分类/序数)列后写入多个文件。arrow::open_dataset使用、dplyr和它们的惰性求值读取数据。虽然这不允许您一次将整个数据集读入内存,但它确实使您可以访问较小块中的所有数据,就好像它是一个大数据集一样。这仍然与data.table内存中工作兼容,用于arrow延迟访问数据。虽然我下面的示例使用data.table,但这不是必需的,实际上引入了一两个额外的步骤,以便将示例数据从 转换tibble为data.table。我建议这样做是因为数据量很大并且您对其进行了标记,而不是因为它是必需的。
这两个示例需要注意的一些事项:
ds反映了所有 336,776 行数据,尽管该对象相当小(只是一个引用文件和元数据的环境)。collect(). 不要尝试对整个数据执行此操作,除非您知道它可以放入内存中。data.table(来自fread),并write_parquet保留了框架的几个属性(包括那个),所以当我们实现下面的数据时,它将是 a data.table。collected 数据是data.table,但对所收集数据的更改不会迁移回 parquet 文件本身。这意味着如果你做了类似的事情collect(ds)[, newcol := 1],那么做另一件事collect(ds)就不会包含newcol其中。值得注意的是,parquet文件一旦写入就不可更改:它们无法更新或追加。实际例子:nycflights13::flights. 该数据包含 336,776 行“2013 年从 NYC 出发的所有航班的航空公司准点数据”。在本示例中,我将数据随机分成 5 个帧并保存到 CSV 文件中。
set.seed(42)
ind <- sample(1:5, size = nrow(nycflights13::flights), replace = TRUE)
head(ind)
dir.create("csv")
i <- 1L
for (dat in split(nycflights13::flights, ind)) {
fwrite(dat, sprintf("csv/%s.csv", i))
i <- i + 1L
}
file.info(Sys.glob("csv/*"))
# size isdir mode mtime ctime atime exe
# csv/1.csv 6274623 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:54 no
# csv/2.csv 6265804 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:52 no
# csv/3.csv 6261533 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:43 no
# csv/4.csv 6260298 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:49 no
# csv/5.csv 6235815 FALSE 666 2022-09-19 05:21:40 2022-09-19 05:21:40 2022-09-19 05:21:46 no
Run Code Online (Sandbox Code Playgroud)
对于下面的两个示例,我不会进行任何数据过滤/格式化/增强;除了突出显示在何处执行此操作之外,我假设您知道在保存之前需要对每个单独的 CSV 文件执行哪些操作。
dir.create("datamart")
for (fn in Sys.glob("csv/*.csv")) {
X <- fread(fn)
arrow::write_parquet(X, file.path("datamart", paste0(basename(fn), ".parquet")))
rm(X)
gc() # optional, might help
}
file.info(Sys.glob("datamart/*"))
# size isdir mode mtime ctime atime exe
# datamart/1.csv.parquet 1251629 FALSE 666 2022-09-19 05:26:28 2022-09-19 05:26:28 2022-09-19 05:35:59 no
# datamart/2.csv.parquet 1249485 FALSE 666 2022-09-19 05:26:45 2022-09-19 05:26:45 2022-09-19 05:35:59 no
# datamart/3.csv.parquet 1249652 FALSE 666 2022-09-19 05:26:47 2022-09-19 05:26:47 2022-09-19 05:35:59 no
# datamart/4.csv.parquet 1249772 FALSE 666 2022-09-19 05:26:48 2022-09-19 05:26:48 2022-09-19 05:35:59 no
# datamart/5.csv.parquet 1245022 FALSE 666 2022-09-19 05:26:49 2022-09-19 05:26:49 2022-09-19 05:35:59 no
Run Code Online (Sandbox Code Playgroud)
读入数据:
library(dplyr)
library(arrow)
ds <- open_dataset("datamart")
nrow(ds)
# [1] 336776
object.size(ds) # environment
# 504 bytes
with(ls.objects(envir = ds), sum(Size))
# [1] 145888
ds %>%
filter(month == 1, between(day, 1, 10))
# FileSystemDataset (query)
# year: int32
# month: int32
# day: int32
# dep_time: int32
# sched_dep_time: int32
# dep_delay: int32
# arr_time: int32
# sched_arr_time: int32
# arr_delay: int32
# carrier: string
# flight: int32
# tailnum: string
# origin: string
# dest: string
# air_time: int32
# distance: int32
# hour: int32
# minute: int32
# time_hour: timestamp[us, tz=UTC]
# * Filter: ((month == 1) and ((day >= 1) and (day <= 10)))
# See $.data for the source Arrow object
Run Code Online (Sandbox Code Playgroud)
ds %>%
filter(month == 1, between(day, 1, 10)) %>%
collect()
# year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay carrier flight tailnum origin dest
# <int> <int> <int> <int> <int> <int> <int> <int> <int> <char> <int> <char> <char> <char>
# 1: 2013 1 1 554 600 -6 812 837 -25 DL 461 N668DN LGA ATL
# 2: 2013 1 1 555 600 -5 913 854 19 B6 507 N516JB EWR FLL
# 3: 2013 1 1 557 600 -3 709 723 -14 EV 5708 N829AS LGA IAD
# 4: 2013 1 1 558 600 -2 923 937 -14 UA 1124 N53441 EWR SFO
# 5: 2013 1 1 559 600 -1 941 910 31 AA 707 N3DUAA LGA DFW
# 6: 2013 1 1 607 607 0 858 915 -17 UA 1077 N53442 EWR MIA
# 7: 2013 1 1 613 610 3 925 921 4 B6 135 N635JB JFK RSW
# 8: 2013 1 1 615 615 0 833 842 -9 DL 575 N326NB EWR ATL
# 9: 2013 1 1 623 610 13 920 915 5 AA 1837 N3EMAA LGA MIA
# 10: 2013 1 1 624 630 -6 840 830 10 MQ 4599 N518MQ LGA MSP
# ---
# 8823: 2013 1 10 2038 2045 -7 2140 2154 -14 B6 1178 N640JB EWR BOS
# 8824: 2013 1 10 2040 2040 0 2351 2357 -6 B6 677 N809JB JFK LAX
# 8825: 2013 1 10 2054 2100 -6 2202 2207 -5 US 2144 N952UW LGA BOS
# 8826: 2013 1 10 2058 2100 -2 2229 2225 4 WN 530 N443WN LGA MDW
# 8827: 2013 1 10 2104 2110 -6 2337 2355 -18 B6 529 N507JB EWR MCO
# 8828: 2013 1 10 2129 2130 -1 148 218 -30 B6 701 N193JB JFK SJU
# 8829: 2013 1 10 2159 2159 0 2247 2300 -13 EV 4519 N13124 EWR BWI
# 8830: 2013 1 10 2320 2250 30 16 2354 22 B6 1018 N612JB JFK BOS
# 8831: 2013 1 10 NA 635 NA NA 940 NA AA 711 N3CDAA LGA DFW
# 8832: 2013 1 10 NA 700 NA NA 1007 NA UA 719 EWR DFW
# 5 variables not shown: [air_time <int>, distance <int>, hour <int>, minute <int>, time_hour <POSc>]
Run Code Online (Sandbox Code Playgroud)
year和month作为嵌套子目录在您的数据中,可索引字段可能是:
便利性和可用性之间需要取得平衡:如果一个分类变量有 20,000 个可能的值,那么可能太多,并且会损失很多效率。open_dataset在子目录中找到的目录/文件越多,在对其执行操作之前调用的时间就越长。我对此没有简单的衡量标准。
注意:人们也许可以使用write_dataset,它与open_dataset上面的内容相对应。partitions=它以同样的方式处理。但是,如果您不确定分区字段中的每个级别对于文件来说都是唯一的(例如,在我的示例数据中,month == 1所有 CSV 文件中都有),则读取的每个 CSV 文件都会覆盖之前写入的一些数据。在这种情况下,正如我将在此处演示的,我将手动写入子目录。
# ensures .parquet files are additive
addfile <- function(dat, base, by) {
thisdir <- do.call(file.path, as.list(c(base, paste(names(by), unname(by), sep = "="))))
dir.create(thisdir, recursive = TRUE, showWarnings = FALSE)
existing <- list.files(thisdir)
thisfile <- sprintf("%i.parquet", length(existing) + 1)
arrow::write_parquet(dat, file.path(thisdir, thisfile))
}
dir.create("datamart2")
for (fn in Sys.glob("csv/*.csv")) {
X <- fread(fn)
X[, addfile(.SD, "datamart2", by = .BY), by = .(year, month)]
rm(X)
gc() # optional
}
file.info(Sys.glob("datamart2/*/*/*"))
# size isdir mode mtime ctime atime exe
# datamart2/year=2013/month=1/1.parquet 133469 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=1/2.parquet 132760 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=1/3.parquet 134069 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=1/4.parquet 132404 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=1/5.parquet 136424 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=10/1.parquet 140490 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=10/2.parquet 139362 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=10/3.parquet 138570 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=10/4.parquet 137501 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=10/5.parquet 137426 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=11/1.parquet 133714 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=11/2.parquet 134291 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=11/3.parquet 133199 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=11/4.parquet 136152 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=11/5.parquet 133310 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=12/1.parquet 141743 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=12/2.parquet 142030 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=12/3.parquet 139573 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=12/4.parquet 140515 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=12/5.parquet 140059 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=2/1.parquet 126203 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=2/2.parquet 126481 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=2/3.parquet 126348 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=2/4.parquet 126618 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=2/5.parquet 123947 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=3/1.parquet 140691 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=3/2.parquet 142811 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=3/3.parquet 142415 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=3/4.parquet 140573 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=3/5.parquet 138510 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=4/1.parquet 140734 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=4/2.parquet 140707 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=4/3.parquet 140507 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=4/4.parquet 141896 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=4/5.parquet 141182 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=5/1.parquet 139517 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=5/2.parquet 140546 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=5/3.parquet 143193 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=5/4.parquet 139979 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=5/5.parquet 141259 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=6/1.parquet 143405 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=6/2.parquet 142591 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=6/3.parquet 142106 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=6/4.parquet 143012 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=6/5.parquet 141489 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=7/1.parquet 145064 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=7/2.parquet 143898 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=7/3.parquet 144104 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=7/4.parquet 146099 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=7/5.parquet 146616 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=8/1.parquet 145155 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=8/2.parquet 143314 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=8/3.parquet 145334 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=8/4.parquet 144581 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=8/5.parquet 145998 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
# datamart2/year=2013/month=9/1.parquet 135902 FALSE 666 2022-09-19 05:53:25 2022-09-19 05:53:25 2022-09-19 05:53:25 no
# datamart2/year=2013/month=9/2.parquet 135525 FALSE 666 2022-09-19 05:53:26 2022-09-19 05:53:26 2022-09-19 05:53:26 no
# datamart2/year=2013/month=9/3.parquet 136012 FALSE 666 2022-09-19 05:53:28 2022-09-19 05:53:28 2022-09-19 05:53:28 no
# datamart2/year=2013/month=9/4.parquet 137506 FALSE 666 2022-09-19 05:53:29 2022-09-19 05:53:29 2022-09-19 05:53:29 no
# datamart2/year=2013/month=9/5.parquet 133894 FALSE 666 2022-09-19 05:53:30 2022-09-19 05:53:30 2022-09-19 05:53:30 no
Run Code Online (Sandbox Code Playgroud)
读取数据与第 1 部分相同,但请注意索引year和month是最后两列而不是前两列:
library(dplyr)
library(arrow)
ds <- open_dataset("datamart2")
nrow(ds)
# [1] 336776
object.size(ds) # environment
# 504 bytes
with(ls.objects(envir = ds), sum(Size))
# [1] 155896
ds %>%
filter(month == 1, between(day, 1, 10))
# FileSystemDataset (query)
# day: int32
# dep_time: int32
# sched_dep_time: int32
# dep_delay: int32
# arr_time: int32
# sched_arr_time: int32
# arr_delay: int32
# carrier: string
# flight: int32
# tailnum: string
# origin: string
# dest: string
# air_time: int32
# distance: int32
# hour: int32
# minute: int32
# time_hour: timestamp[us, tz=UTC]
# year: int32
# month: int32
# * Filter: ((month == 1) and ((day >= 1) and (day <= 10)))
# See $.data for the source Arrow object
ds %>%
filter(month == 1, between(day, 1, 10)) %>%
collect()
# day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay carrier flight tailnum origin dest air_time
# <int> <int> <int> <int> <int> <int> <int> <char> <int> <char> <char> <char> <int>
# 1: 1 517 515 2 830 819 11 UA 1545 N14228 EWR IAH 227
# 2: 1 542 540 2 923 850 33 AA 1141 N619AA JFK MIA 160
# 3: 1 544 545 -1 1004 1022 -18 B6 725 N804JB JFK BQN 183
# 4: 1 557 600 -3 838 846 -8 B6 79 N593JB JFK MCO 140
# 5: 1 558 600 -2 849 851 -2 B6 49 N793JB JFK PBI 149
# 6: 1 559 600 -1 854 902 -8 UA 1187 N76515 EWR LAS 337
# 7: 1 600 600 0 851 858 -7 B6 371 N595JB LGA FLL 152
# 8: 1 615 615 0 1039 1100 -21 B6 709 N794JB JFK SJU 182
# 9: 1 635 635 0 1028 940 48 AA 711 N3GKAA LGA DFW 248
# 10: 1 655 655 0 1021 1030 -9 DL 1415 N3763D JFK SLC 294
# ---
# 8823: 10 2038 2045 -7 2140 2154 -14 B6 1178 N640JB EWR BOS 40
# 8824: 10 2040 2040 0 2351 2357 -6 B6 677 N809JB JFK LAX 343
# 8825: 10 2054 2100 -6 2202 2207 -5 US 2144 N952UW LGA BOS 34
# 8826: 10 2058 2100 -2 2229 2225 4 WN 530 N443WN LGA MDW 117
# 8827: 10 2104 2110 -6 2337 2355 -18 B6 529 N507JB EWR MCO 127
# 8828: 10 2129 2130 -1 148 218 -30 B6 701 N193JB JFK SJU 186
# 8829: 10 2159 2159 0 2247 2300 -13 EV 4519 N13124 EWR BWI 33
# 8830: 10 2320 2250 30 16 2354 22 B6 1018 N612JB JFK BOS 35
# 8831: 10 NA 635 NA NA 940 NA AA