我一直在使用Spark进行一些数据分析和机器学习.
读入一些数据作为trainDF后,我构造了两个在逻辑上等效的管道,但其中一个管道在末尾有一个VectorAssembler(只有一个inputCols)来演示减速:
scala> val assembler = new VectorAssembler().setInputCols(Array("all_description_features")).setOutputCol("features")
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_a76e6412bc96
scala> val idfDescription = new IDF().setInputCol("all_description_hashed").setOutputCol("all_description_features")
idfDescription: org.apache.spark.ml.feature.IDF = idf_4b504cf08d86
scala> val descriptionArray = Array(tokensDescription, removerDescription, hashingTFDescription, idfDescription, assembler, lr)
descriptionArray: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}}] = Array(regexTok_316674b9209b, stopWords_8ecdf6f09955, hashingTF_48cf3f9cc065, idf_4b504cf08d86, vecAssembler_a76e6412bc96, logreg_f0763c33b304)
scala> val pipeline = new Pipeline().setStages(descriptionArray)
pipeline: org.apache.spark.ml.Pipeline = pipeline_4e462d0ee649
scala> time {pipeline.fit(trainDF)}
16/09/28 13:04:17 WARN Executor: 1 block locks were not …Run Code Online (Sandbox Code Playgroud)