将IndexToString应用于Spark中的特征向量

gst*_*lvr 5 scala apache-spark apache-spark-ml

上下文:我有一个数据框,其中所有分类值都已使用StringIndexer编制索引.

val categoricalColumns = df.schema.collect { case StructField(name, StringType, nullable, meta) => name }    

val categoryIndexers = categoricalColumns.map {
  col => new StringIndexer().setInputCol(col).setOutputCol(s"${col}Indexed") 
}
Run Code Online (Sandbox Code Playgroud)

然后我使用VectorAssembler来矢量化所有要素列(包括索引的分类列).

val assembler = new VectorAssembler()
    .setInputCols(dfIndexed.columns.diff(List("label") ++ categoricalColumns))
    .setOutputCol("features")
Run Code Online (Sandbox Code Playgroud)

应用分类器和一些额外的步骤后,我最终得到一个具有标签,功能和预测的数据框.我想将我的特征向量扩展为单独的列,以便将索引值转换回原始的String形式.

val categoryConverters = categoricalColumns.zip(categoryIndexers).map {
colAndIndexer => new IndexToString().setInputCol(s"${colAndIndexer._1}Indexed").setOutputCol(colAndIndexer._1).setLabels(colAndIndexer._2.fit(df).labels)
}
Run Code Online (Sandbox Code Playgroud)

问题:是否有一种简单的方法可以做到这一点,或者是以某种方式将预测列附加到测试数据框的最佳方法?

我尝试过的:

val featureSlicers = categoricalColumns.map {
  col => new VectorSlicer().setInputCol("features").setOutputCol(s"${col}Indexed").setNames(Array(s"${col}Indexed"))
}
Run Code Online (Sandbox Code Playgroud)

应用这个给了我想要的列,但是它们是Vector形式的(因为它意味着这样做)而不是Double类型.

编辑: 所需的输出是原始数据框(即分类特征为字符串而非索引),附加列指示预测标签(在我的情况下为0或1).

例如,假设我的分类器的输出看起来像这样:

+-----+---------+----------+
|label| features|prediction|
+-----+---------+----------+
|  1.0|[0.0,3.0]|       1.0|
+-----+---------+----------+
Run Code Online (Sandbox Code Playgroud)

通过在每个功能上应用VectorSlicer,我会得到:

+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
|  1.0|[0.0,3.0]|       1.0|        [0.0]|        [3.0]|
+-----+---------+----------+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)

哪个好,但我需要:

+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
|  1.0|[0.0,3.0]|       1.0|         0.0 |         3.0 |
+-----+---------+----------+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)

然后能够使用IndexToString并将其转换为:

+-----+---------+----------+-------------+-------------+
|label| features|prediction|    status   |    artist   |
+-----+---------+----------+-------------+-------------+
|  1.0|[0.0,3.0]|       1.0|        good |  Pink Floyd |
+-----+---------+----------+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)

甚至:

+-----+----------+-------------+-------------+
|label|prediction|    status   |    artist   |
+-----+----------+-------------+-------------+
|  1.0|       1.0|        good |  Pink Floyd |
+-----+----------+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)

zer*_*323 4

嗯,这不是一个非常有用的操作,但应该可以使用列元数据和简单的 UDF 来提取所需的信息。我假设您的数据已经创建了类似于此的管道:

import org.apache.spark.ml.feature.{VectorSlicer, VectorAssembler, StringIndexer}
import org.apache.spark.ml.Pipeline

val df = sc.parallelize(Seq(
  (1L, "a", "foo", 1.0), (2L, "b", "bar", 2.0), (3L, "a", "bar", 3.0)
)).toDF("id", "x1", "x2", "x3")

val featureCols = Array("x1", "x2", "x3")
val featureColsIdx = featureCols.map(c => s"${c}_i")

val indexers = featureCols.map(
  c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_i")
)

val assembler = new VectorAssembler()
  .setInputCols(featureColsIdx)
  .setOutputCol("features")

val slicer = new VectorSlicer()
  .setInputCol("features")
  .setOutputCol("string_features")
  .setNames(featureColsIdx.init)


val transformed = new Pipeline()
  .setStages(indexers :+ assembler :+ slicer)
  .fit(df)
  .transform(df)
Run Code Online (Sandbox Code Playgroud)

首先我们可以从特征中提取所需的元数据:

val meta = transformed.select($"string_features")
  .schema.fields.head.metadata
  .getMetadata("ml_attr") 
  .getMetadata("attrs")
  .getMetadataArray("nominal")
Run Code Online (Sandbox Code Playgroud)

并将其转换为更易于使用的东西

case class NominalMetadataWrapper(idx: Long, name: String, vals: Array[String])

// In general it could a good idea to make it a broadcast variable
val lookup = meta.map(m => NominalMetadataWrapper(
  m.getLong("idx"), m.getString("name"), m.getStringArray("vals")
))
Run Code Online (Sandbox Code Playgroud)

最后是一个小的UDF:

import scala.util.Try

val transFeatures = udf((v: Vector) => lookup.map{
  m => Try(m.vals(v(m.idx.toInt).toInt)).toOption
})

transformed.select(transFeatures($"string_features")).
Run Code Online (Sandbox Code Playgroud)