如何为ML算法矢量化DataFrame列?

fas*_*jhn 6 scala apache-spark apache-spark-ml apache-spark-mllib

有一个带有一些分类字符串值的DataFrame(例如uuid | url | browser).

我想将它转换为double来执行接受双矩阵的ML算法.

作为转换方法,我使用StringIndexer(spark 1.4)将我的字符串值映射到double值,所以我定义了一个这样的函数:

def str(arg: String, df:DataFrame) : DataFrame =
   (
    val indexer = new StringIndexer().setInputCol(arg).setOutputCol(arg+"_index")
    val newDF = indexer.fit(df).transform(df)
    return newDF
   )
Run Code Online (Sandbox Code Playgroud)

现在问题是我将迭代df的foreach列,调用此函数并在解析的双列中添加(或转换)原始字符串列,因此结果将是:

初始df:

[String: uuid|String: url| String: browser]
Run Code Online (Sandbox Code Playgroud)

最终df:

[String: uuid|Double: uuid_index|String: url|Double: url_index|String: browser|Double: Browser_index]
Run Code Online (Sandbox Code Playgroud)

提前致谢

zer*_*323 11

你可以简单地foldLeft通过Array列:

val transformed: DataFrame = df.columns.foldLeft(df)((df, arg) => str(arg, df))
Run Code Online (Sandbox Code Playgroud)

不过,我认为这不是一个好方法.由于src丢弃,StringIndexerModel因此在获取新数据时无法使用它.因此,我建议使用Pipeline:

import org.apache.spark.ml.Pipeline

val transformers: Array[org.apache.spark.ml.PipelineStage] = df.columns.map(
   cname => new StringIndexer()
     .setInputCol(cname)
     .setOutputCol(s"${cname}_index")
)

// Add the rest of your pipeline like VectorAssembler and algorithm
val stages: Array[org.apache.spark.ml.PipelineStage] = transformers ++ ???

val pipeline = new Pipeline().setStages(stages)
val model = pipeline.fit(df)
model.transform(df)
Run Code Online (Sandbox Code Playgroud)

VectorAssembler 可以像这样包括:

val assembler  = new VectorAssembler()
    .setInputCols(df.columns.map(cname => s"${cname}_index"))
    .setOutputCol("features")

val stages = transformers :+ assembler
Run Code Online (Sandbox Code Playgroud)

你也可以使用RFormula,它不那么可定制,但更简洁:

import org.apache.spark.ml.feature.RFormula

val rf = new RFormula().setFormula(" ~ uuid + url + browser - 1")
val rfModel = rf.fit(dataset)
rfModel.transform(dataset)
Run Code Online (Sandbox Code Playgroud)