将data.table列快速连接到一个字符串列中

Mat*_*ill 21 r concatenation data.table

给定a中列名的任意列表data.table,我想将这些列的内容连接成存储在新列中的单个字符串.我需要连接的列并不总是相同的,所以我需要生成表达式来动态执行.

我有一种潜在的怀疑,我使用这个eval(parse(...))电话的方式可以用更优雅的东西代替,但下面的方法是我迄今为止能够获得的最快的方法.

有1000万行,这个样本数据需要大约21.7秒(基数R paste0需要稍长一些 - 23.6秒).我的实际数据有18-20列被连接,最多有1亿行,因此减速变得更加不切实际.

有什么想法可以加快速度吗?


目前的方法

library(data.table)
library(stringi)

RowCount <- 1e7
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- c("x","a","b","c","d","e","f","y")
PasteStatement <- stri_c('stri_c(',stri_c(ConcatCols,collapse = ","),')')
print(PasteStatement)
Run Code Online (Sandbox Code Playgroud)

[1] "stri_c(x,a,b,c,d,e,f,y)"
Run Code Online (Sandbox Code Playgroud)

然后用于使用以下表达式连接列:

DT[,State := eval(parse(text = PasteStatement))]
Run Code Online (Sandbox Code Playgroud)

输出样本:

     x   y a b c d e f        State
1: foo bar 4 8 3 6 9 2 foo483692bar
2: foo bar 8 4 8 7 8 4 foo848784bar
3: foo bar 2 6 2 4 3 5 foo262435bar
4: foo bar 2 4 2 4 9 9 foo242499bar
5: foo bar 5 9 8 7 2 7 foo598727bar
Run Code Online (Sandbox Code Playgroud)

分析结果

火焰图 数据


更新1: ,fread,fwritesed

按照@Gregor的建议,尝试使用sed在磁盘上进行连接.由于data.table的快速freadfwrite功能,我能够将列写入磁盘,使用sed消除逗号分隔符,然后在大约18.3秒内回读后处理输出- 不够快切换,但仍然是一个有趣的切线!

ConcatCols <- c("x","a","b","c","d","e","f","y")
fwrite(DT[,..ConcatCols],"/home/xxx/DT.csv")
system("sed 's/,//g' /home/xxx/DT.csv > /home/xxx/DT_Post.csv ")
Post <- fread("/home/xxx/DT_Post.csv")
DT[,State := Post[[1]]]
Run Code Online (Sandbox Code Playgroud)

18.3整体秒的细分(无法使用profvis,因为sedR profiler是不可见的)

  • data.table::fwrite() - 0.5秒
  • sed- 14.8秒
  • data.table::fread() - 3.0秒
  • := - 0.0秒

如果不出意外,这证明了data.table作者对磁盘IO性能优化的广泛工作.(我使用的是1.10.5开发版,增加了多线程来fread,fwrite已经多线程了一段时间).

需要注意的是:如果有一个解决方法来使用fwrite@Gregor在下面的另一个评论中建议使用和空白分隔符,那么这种方法可以合理地减少到~3.5秒!

更新这个切线:forked data.table并注释掉需要大于长度0的分隔符的行,神秘地得到了一些空格?在引起一些试图弄乱C内部的段错误之后,我暂时把它放在冰上.理想的解决方案不需要写入磁盘并将所有内容保存在内存中.


更新2:sprintf针对整数特定情况

这里的第二次更新:虽然我在原始用法示例中包含了字符串,但我的实际用例专门连接整数值(根据上游清理步骤,始终可以假定为非空值).

由于用例非常具体并且与原始问题不同,因此我不会直接将时序与之前发布的时间进行比较.然而,有一点需要注意的是,尽管stringi很好地处理了许多字符编码格式,混合矢量类型而不需要指定它们,并且开箱即可进行一堆错误处理,这确实增加了一些时间(这对于大多数情况来说可能是值得的).

通过使用基本R的sprintf功能并让它知道所有输入都是整数,我们可以减少大约30%的运行时间为500万行,并计算18个整数列.(20.3秒而不是28.9)

library(data.table)
library(stringi)
RowCount <- 5e6
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- list("a","b","c","d","e","f")
## Do it 3x as many times
ConcatCols <- c(ConcatCols,ConcatCols,ConcatCols)

## Using stringi::stri_c ---------------------------------------------------
stri_joinStatement <- stri_c('stri_join(',stri_c(ConcatCols,collapse = ","),', sep="", collapse=NULL, ignore_null=TRUE)')
DT[, State := eval(parse(text = stri_joinStatement))]

## Using sprintf -----------------------------------------------------------
sprintfStatement <- stri_c("sprintf('",stri_flatten(rep("%i",length(ConcatCols))),"', ",stri_c(ConcatCols,collapse = ","),")")
DT[,State_sprintf_i := eval(parse(text = sprintfStatement))]
Run Code Online (Sandbox Code Playgroud)

生成的语句如下:

> cat(stri_joinStatement)
stri_join(a,b,c,d,e,f,a,b,c,d,e,f,a,b,c,d,e,f, sep="", collapse=NULL, ignore_null=TRUE)
> cat(sprintfStatement)
sprintf('%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i%i', a,b,c,d,e,f,a,b,c,d,e,f,a,b,c,d,e,f)
Run Code Online (Sandbox Code Playgroud)

的sprintf


更新3:R不一定要慢.

基于@MartinModrák的答案,我根据一些data.table专门针对专门的"单个数字整数"案例的内部构建了一个单一技巧的小马包:fastConcat.(不要在CRAN上很快找到它,但是你可以通过安装github repo,msummersgill/fastConcat来自担风险使用它.)

这也许可以被人谁了解多少进一步提高c更好,但现在,它在运行相同的情况下,在更新2 2.5秒 -围绕8倍的速度比sprintf()11.5倍比快stringi::stri_c(),我原来用的方法.

对我来说,这突出了一些最简单的操作的性能改进的巨大机会,R 如最基本的字符串向量连接和更好的调整c.我想像@Matt Dowle这样的人已经看过多年了 - 只要他有时间重写所有内容R,而不仅仅是data.frame.

fastConcat


Mar*_*rák 12

C来救援!

从data.table中窃取一些代码我们可以编写一个工作速度更快的C函数(并且可以并行化甚至更快).

首先确保你有一个有效的C++工具链:

library(inline)

fx <- inline::cfunction( signature(x = "integer", y = "numeric" ) , '
    return ScalarReal( INTEGER(x)[0] * REAL(y)[0] ) ;
' )
fx( 2L, 5 ) #Should return 10
Run Code Online (Sandbox Code Playgroud)

那么这应该工作(假设只有整数的数据,但代码可以扩展到其他类型):

library(inline)
library(data.table)
library(stringi)

header <- "

//Taken from https://github.com/Rdatatable/data.table/blob/master/src/fwrite.c
static inline void reverse(char *upp, char *low)
{
  upp--;
  while (upp>low) {
  char tmp = *upp;
  *upp = *low;
  *low = tmp;
  upp--;
  low++;
  }
}

void writeInt32(int *col, size_t row, char **pch)
{
  char *ch = *pch;
  int x = col[row];
  if (x == INT_MIN) {
  *ch++ = 'N';
  *ch++ = 'A';
  } else {
  if (x<0) { *ch++ = '-'; x=-x; }
  // Avoid log() for speed. Write backwards then reverse when we know how long.
  char *low = ch;
  do { *ch++ = '0'+x%10; x/=10; } while (x>0);
  reverse(ch, low);
  }
  *pch = ch;
}

//end of copied code 

"



 worker_fun <- inline::cfunction( signature(x = "list", preallocated_target = "character", columns = "integer", start_row = "integer", end_row = "integer"), includes = header , "
  const size_t _start_row = INTEGER(start_row)[0] - 1;
  const size_t _end_row = INTEGER(end_row)[0];

  const int max_out_len = 256 * 256; //max length of the final string
  char buffer[max_out_len];
  const size_t num_elements = _end_row - _start_row;
  const size_t num_columns = LENGTH(columns);
  const int * _columns = INTEGER(columns);

  for(size_t i = _start_row; i < _end_row; ++i) {
    char *buf_pos = buffer;
    for(size_t c = 0; c < num_columns; ++c) {
      if(c > 0) {
        buf_pos[0] = ',';
        ++buf_pos;
      }
      writeInt32(INTEGER(VECTOR_ELT(x, _columns[c] - 1)), i, &buf_pos);
    }
    SET_STRING_ELT(preallocated_target,i, mkCharLen(buffer, buf_pos - buffer));
  }
return preallocated_target;
" )

#Test with the same data

RowCount <- 5e6
DT <- data.table(x = "foo",
                 y = "bar",
                 a = sample.int(9, RowCount, TRUE),
                 b = sample.int(9, RowCount, TRUE),
                 c = sample.int(9, RowCount, TRUE),
                 d = sample.int(9, RowCount, TRUE),
                 e = sample.int(9, RowCount, TRUE),
                 f = sample.int(9, RowCount, TRUE))

## Generate an expression to paste an arbitrary list of columns together
ConcatCols <- list("a","b","c","d","e","f")
## Do it 3x as many times
ConcatCols <- c(ConcatCols,ConcatCols,ConcatCols)


ptm <- proc.time()
preallocated_target <- character(RowCount)
column_indices <- sapply(ConcatCols, FUN = function(x) { which(colnames(DT) == x )})
x <- worker_fun(DT, preallocated_target, column_indices, as.integer(1), as.integer(RowCount))
DT[, State := preallocated_target]
proc.time() - ptm
Run Code Online (Sandbox Code Playgroud)

虽然您的(仅限整数)示例在我的PC上运行大约20秒,但运行时间约为5秒,并且可以轻松并行化.

有些事情需要注意:

  • 代码不是生产就绪 - 应该对函数输入进行大量的健全性检查(特别是检查所有列的长度是否相同,检查列类型,preallocated_target大小等)
  • 该函数将其输出放入预分配的字符向量,这是非标准和丑陋的(R通常没有传递引用语义)但允许并行化(见下文).
  • 最后两个参数是要处理的起始行和结束行,再次,这是用于并行化
  • 该函数接受列索引而不是列名.所有列都必须是整数类型.
  • 除输入data.table和preallocated_target外,输入必须是整数
  • 不包括该函数的编译时间(因为您应该事先编译它 - 甚至可以创建一个包)

并行

编辑:由于方式clusterExport和R字符串存储工作,下面的方法实际上会失败.因此,可能需要在C中进行并行化,类似于在data.table中实现的方式.

由于您无法跨R进程传递内联编译函数,因此并行化需要更多工作.为了能够并行使用上述函数,您需要使用R编译器单独编译它并使用dyn.loadOR将其包装在一个包中或使用forking后端进行并行(我没有一个,forking仅适用于UNIX) .

并行运行会看起来像(未经测试):

no_cores <- detectCores()

# Initiate cluster
cl <- makeCluster(no_cores)

#Preallocated target and prepare params
num_elements <- length(DT[[1]])
preallocated_target <- character(num_elements)
block_size <- 4096 #No of rows processed at once. Adjust for best performance
column_indices <- sapply(ConcatCols, FUN = function(x) { which(colnames(DT) == x )})

num_blocks <- ceiling(num_elements / block_size)

clusterExport(cl, 
   c("DT","preallocated_target","column_indices","num_elements", "block_size"))
clusterEvalQ(cl, <CODE TO LOAD THE NATIVE FUNCTION HERE>)

parLapply(cl, 1:num_blocks ,
          function(block_id)
          {
            throw_away <- 
              worker_fun(DT, preallocated_target, columns, 
              (block_id - 1) * block_size + 1, min(num_elements, block_id * block_size - 1))
            return(NULL)
          })



stopCluster(cl)
Run Code Online (Sandbox Code Playgroud)

  • 老实说这是我第一次为R编写C代码的经验,所以我也不知道在包中使用C代码有什么必要:-)尽管很有趣.请注意,内联可以为您提供完整的源代码(只要有编译时错误就会显示它).我相信OpenMP的东西和其他列类型的writeXX函数可以很容易地从data.table中的fwrite.c中获取,只需要很少的修改就能使它真正起作用. (3认同)

tal*_*lat 8

我不知道样本数据对于您的实际数据有多具代表性,但是对于您的采样数据,只需连接一次ConcatCol的每个独特组合而不是多次,就可以实现显着的性能提升.

这意味着对于样本数据,如果你也做了所有的重复,你会看到~500k级联,而不是1000万.

请参阅以下代码和时序示例:

system.time({
  setkeyv(DT, ConcatCols)
  DTunique <- unique(DT[, ConcatCols, with=FALSE], by = key(DT))
  DTunique[, State :=  do.call(paste, c(DTunique, sep = ""))]
  DT[DTunique, State := i.State, on = ConcatCols]
})
#       user      system     elapsed 
#      7.448       0.462       4.618 
Run Code Online (Sandbox Code Playgroud)

大约一半的时间花在了setkey部件上.如果您的数据已经被键入,则时间会进一步减少到超过2秒.

setkeyv(DT, ConcatCols)
system.time({
  DTunique <- unique(DT[, ConcatCols, with=FALSE], by = key(DT))
  DTunique[, State :=  do.call(paste, c(DTunique, sep = ""))]
  DT[DTunique, State := i.State, on = ConcatCols]
})
#       user      system     elapsed 
#      2.526       0.280       2.181 
Run Code Online (Sandbox Code Playgroud)

  • 这也是一个很好的答案!我的实际数据通常只有大约10,000种独特的组合(无论我处理100万还是1亿),所以这对我的应用来说是一种非常有效的方法.使用更具代表性的数据集(1000万行,18列,9216个唯一组合),此方法在**5.2秒**内执行,比定制函数的**3.4秒**运行时间慢_slightly_慢基于@MartinModrák的回答,在[msummersgill/fastConcat](https://github.com/msummersgill/fastConcat)中的`fastConcat :: concat()`. (2认同)