使用Spark MLlib Scala API按组运行3000+随机森林模型

SH *_* Y. 6 scala r apache-spark apache-spark-mllib

我正在尝试使用Spark Scala API在大型模型输入csv文件上按组(School_ID,超过3,000)构建随机森林模型.每组包含约3000-4000条记录.我拥有的资源是20-30 aws m3.2xlarge实例.

在R中,我可以按组构建模型并将它们保存到这样的列表中 -

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>% 
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))
Run Code Online (Sandbox Code Playgroud)

列表可以存储在某个地方,我可以在需要使用它们时调用它们,如下所示 -

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)
Run Code Online (Sandbox Code Playgroud)

我想知道如何在Spark中做到这一点,无论我是否需要首先按组分割数据以及如何在必要时有效地进行分割.

我能够根据以下代码通过School_ID拆分文件,但似乎它为每次迭代创建了一个单独的作业子集,并且需要很长时间才能完成作业.有没有办法一次性完成?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
  bySchoolArray(i).
    write.format("com.databricks.spark.csv").
    option("header", "true").
    save("model_input_bySchool/model_input_"+ schools(i))
}
Run Code Online (Sandbox Code Playgroud)

来源: 如何在SCALA和SPARK中将数据框拆分为具有相同列值的数据框

编辑8/24/2015 我正在尝试将我的数据帧转换为随机林模型接受的格式.我正在遵循此线程上的说明 如何在Spark ML中创建用于分类的正确数据帧

基本上,我创建了一个新变量"label"并将我的类存储在Double中.然后我使用VectorAssembler函数组合我的所有功能,并将我的输入数据转换如下 -

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
  select("SCHOOL_ID", "label", "features")
Run Code Online (Sandbox Code Playgroud)

部分错误消息(如果您需要完整的日志消息,请告诉我) -

scala.MatchError:org.apache.spark.ml.feature.VectorAssembler上的StringType(类org.apache.spark.sql.types.StringType $)$ anonfun $ 2.apply(VectorAssembler.scala:57)

将所有变量转换为数字类型后解决此问题.

编辑8/25/2015 ml模型不接受我手动编码的标签,因此我需要使用StringIndexer来解决此处所示的问题.根据官方文档,最常见的标签为0.它会导致School_ID上的标签不一致.我想知道是否有办法创建标签而不重置值的顺序.

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")
Run Code Online (Sandbox Code Playgroud)

任何建议或指示都会有所帮助,随时提出任何问题.谢谢!

zer*_*323 6

由于您已经为每所学校分别设置了数据框,因此这里没有太多工作要做.由于你假设你想使用数据帧ml.classification.RandomForestClassifier.如果是这样,你可以尝试这样的事情:

  1. 提取管道逻辑.RandomForestClassifier根据您的要求调整参数和变压器

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.{Pipeline, PipelineModel}
    
    def trainModel(df: DataFrame): PipelineModel = {
       val rf  = new RandomForestClassifier()
       val pipeline = new Pipeline().setStages(Array(rf))
       pipeline.fit(df)
    }
    
    Run Code Online (Sandbox Code Playgroud)
  2. 在每个子集上训练模型

    val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))
    
    Run Code Online (Sandbox Code Playgroud)
  3. 保存模型

    import java.io._
    
    def saveModel(name: String, model: PipelineModel) = {
      val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
      oos.writeObject(model)
      oos.close
    }
    
    schools.zip(bySchoolArrayModels).foreach{
      case (name, model) => saveModel(name, Model)
    }
    
    Run Code Online (Sandbox Code Playgroud)
  4. 可选:由于个别子是相当小的,你可以尝试一个类似于我已经描述了一种方法这里来同时提交多个任务.

  5. 如果使用mllib.tree.model.RandomForestModel,可以省略3. model.save直接使用 .因为反序列化似乎存在一些问题(如何在spark.ml中反序列化管道模型? - 据我所知,它可以正常工作但比安慰更安全,我猜)它可能是一种首选方法.

编辑

根据官方文件:

VectorAssembler 接受以下输入列类型:所有数字类型,布尔类型和矢量类型.

由于错误表明您的列是a,因此String您应首先对其进行转换,例如使用StringIndexer.