通过重复调用内存中的数据帧来减速

hgb*_*234 9 r apache-spark apache-spark-ml sparklyr

假设我有40个连续(DoubleType)变量,我已经使用了四分位数ft_quantile_discretizer.识别所有变量的四分位数非常快,因为该函数支持一次执行多个变量.

接下来,我想要一个热门代码那些分段变量,但是目前没有一个热代码支持所有这些变量的功能.所以我通过循环遍历变量,一次一个地管道ft_string_indexer,ft_one_hot_encodersdf_separate_column为每个分段变量.这可以完成工作.但是,随着循环的进行,它会大大减慢.我认为它的内存不足,但无法弄清楚如何编程,以便它以相同的速度在变量上执行.

如果q_vars是连续变量的变量名称(例如40个)的字符数组,我该如何以更加火花的方式对其进行编码?

for (v in q_vars) {
   data_sprk_q<-data_sprk_q %>% 
       ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
       ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
       sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]]) 
}
Run Code Online (Sandbox Code Playgroud)

我也尝试将所有引用的变量作为单个大型管道执行,但这也没有解决问题,所以我认为它与循环本身没有任何关系.

test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))
Run Code Online (Sandbox Code Playgroud)

任何帮助,将不胜感激.

use*_*411 8

通常情况下,由于Catalyst优化器的线性复杂性比线性更差,因此预计会出现长ML管线的一些(有时是实质性的)减速.如果没有将流程分成多个流水线,并打破其间的血统(使用检查点并将数据写入持久性存储并将其加载回来),那么目前您无法做到这一点.

但是,您当前的代码在此基础上添加了许多问题:

  • 除非您使用超过10个桶 StringIndexer

    ft_string_indexer(v ,paste0(v, "b"), "keep", string_order_type = "alphabetAsc")
    
    Run Code Online (Sandbox Code Playgroud)

    只需复制指定的标签QuantileDiscretizer.在使用词典顺序时,使用更多级别的行为变得更加有用.

  • 可能根本不需要应用单热编码(并且在最坏的情况下可能是有害的),这取决于下游过程,即使使用线性模型,也可能不是绝对必要的(您可能认为分配的标签是有效的)序数,记录为名义值,增加维数不是理想的结果).

  • 然而,最大的问题是应用sdf_separate_column.它

    • 通过增加表达式的数量来增加计算执行计划的成本.
    • 通过将稀疏数据转换为密集数据来增加处理所需的内存量.
    • 内部sparklyr使用UserDefinedFunction每个索引,有效地导致同一行的重新分配,解码和垃圾收集给集群带来很大压力.
    • 最后但并非最不重要的是,它丢弃了Spark ML广泛使用的列元数据.

    我强烈建议不要在这里使用此功能.根据您的评论,您似乎希望在将结果传递给其他算法之前对列进行子集化 - 您可以使用它VectorSlicer.

总的来说,你可以重写你的管道

set.seed(1)

df <- copy_to(sc, tibble(x=rnorm(100), y=runif(100), z=rpois(100, 1)))

input_cols <- colnames(df)
discretized_cols <- paste0(input_cols, "_d")
encoded_cols <- paste0(discretized_cols, "_e") %>% setNames(discretized_cols)

discretizer <- ft_quantile_discretizer(
  sc, input_cols = input_cols, output_cols = discretized_cols, num_buckets = 10
)
encoders <- lapply(
  discretized_cols, 
  function(x) ft_one_hot_encoder(sc, input_col=x, output_col=encoded_cols[x])
)

transformed_df <- do.call(ml_pipeline, c(list(discretizer), encoders)) %>%
  ml_fit(df) %>% 
  ml_transform(df)
Run Code Online (Sandbox Code Playgroud)

ft_vector_slicer在需要时申请.例如,从x您可以获取与第一个,第三个和第六个桶对应的值:

transformed_df %>% 
  ft_vector_slicer(
    input_col="x_d_e", output_col="x_d_e_s", indices=c(0, 2, 5)) 
Run Code Online (Sandbox Code Playgroud)