如何在多列上使用Spark QuantumDiscretizer

sra*_*m24 4 dictionary pipeline scala quantile apache-spark

所有,

我有一个毫升管道设置如下

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)
Run Code Online (Sandbox Code Playgroud)

当我运行此命令时,spark似乎将每个离散器设置为单独的作业。有没有一种方法可以在有管道或无管道的情况下将所有离散器作为单个作业运行?感谢您的帮助,不胜感激。

小智 5

Spark 2.3.0已添加对此功能的支持。查看发行文件

  • 多列支持多个功能转换器:
    • [SPARK-13​​030]:OneHotEncoderEstimator(Scala / Java / Python)
    • [SPARK-22397]:QuantileDiscretizer(Scala / Java)
    • [SPARK-20542]:Bucketizer(Scala / Java / Python)

您现在可以使用setInputColssetOutputCols指定多个列,尽管它似乎尚未在官方文档中反映出来。与同时处理每列一次一项工作相比,使用此新补丁可以大大提高性能。

您的示例可以进行如下修改:

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)

val discretizer = new QuantileDiscretizer()
  .setInputCols(continuous)
  .setOutputCols(continuous.map(c => s"${c}_disc"))
  .setNumBuckets(3)

val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)
Run Code Online (Sandbox Code Playgroud)