如何在Spark中使用稀疏矩阵训练随机森林?

ℕʘʘ*_*ḆḽḘ 5 r apache-spark apache-spark-ml apache-spark-mllib sparklyr

考虑这个使用的简单示例sparklyr:

library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0
Run Code Online (Sandbox Code Playgroud)

数据帧的大小相当小(大约70k行和14k唯一的单词).

现在,naive bayes在我的群集上训练模型只需几秒钟.首先,我定义了pipeline

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))
Run Code Online (Sandbox Code Playgroud)

然后训练naive bayes模型

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3
Run Code Online (Sandbox Code Playgroud)

现在的问题是,试图运行任何tree基于模型(random forest,boosted trees在相同的(实际上是很小的!!)数据集等)将无法正常工作.

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(
Run Code Online (Sandbox Code Playgroud)

错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段69.0中的任务0失败4次,最近失败:阶段69.0中丢失任务0.3(TID 1580,1.1.1.1.1,执行者5): java.lang.IllegalArgumentException:大小超过Integer.MAX_VALUE

我认为这是由于令牌的矩阵表示的稀疏性,但有什么可以在这里完成的吗?这是一个sparklyr问题吗?有spark问题吗?我的代码效率不高吗?

谢谢!

eli*_*sah 5

您收到此错误是因为您实际上达到了Spark中我们所拥有的著名2G限制https://issues.apache.org/jira/browse/SPARK-6235

解决方案是在将数据提供给算法之前对数据进行重新分区。

这实际上是这篇文章中的两个陷阱:

  • 使用本地数据。
  • Spark中基于树的模型需要大量内存。

因此,让我们回顾一下看起来无害的代码;

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>% 
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
Run Code Online (Sandbox Code Playgroud)

那么最后一行是做什么的呢?

copy_to (不适用于大数据集),实际上只是将本地R数据帧复制到1个分区Spark DataFrame

因此,您只需要重新分区数据,以确保一旦管道在将数据输入之前准备好数据gbt,分区大小就会小于2GB。

因此,您只需执行以下操作即可对数据进行分区:

# 20 is an arbitrary number I chose to test and it seems to work well in this case, 
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <- 
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% 
 sdf_repartition(partitions = 20)
Run Code Online (Sandbox Code Playgroud)

PS1: max_memory_in_mb是您要用于gbt计算其统计信息的内存量。它与输入的数据量没有直接关系。

PS2:如果没有为执行程序设置足够的内存,则可能会遇到java.lang.OutOfMemoryError : GC overhead limit exceeded

编辑:重新分区数据是什么意思?

在谈论重新分区之前,我们总是可以参考分区的定义。我会尽量简短。

分区是大型分布式数据集的逻辑块。

Spark使用分区管理数据,该分区有助于以最少的网络流量并行化分布式数据处理,以便在执行程序之间发送数据。默认情况下,Spark尝试将数据从RDD附近的节点读取到RDD中。由于Spark通常访问分布式分区数据,因此为了优化转换操作,它会创建分区来保存数据块。

分区数量的增加将使每个分区的数据更少(或根本没有!)

来源:摘自@JacekLaskowski 精通Apache Spark书

但是,在这种情况下,数据分区并不总是正确的。因此需要重新分区。(sdf_repartitionsparklyr

sdf_repartition会分散和整理节点上的数据。也就是说,sdf_repartition(20)将创建20个数据分区,而不是原来的1个分区。

我希望这有帮助。

整个代码:

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>% 
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
Run Code Online (Sandbox Code Playgroud)