如何从UDF创建自定义Transformer?

Alb*_*nto 9 scala user-defined-functions apache-spark apache-spark-sql apache-spark-ml

我试图用自定义阶段创建和保存管道.我需要使用一个添加column到我DataFrameUDF.因此,我想知道是否有可能将一个UDF或类似的动作转换成一个Transformer

我的自定义UDF看起来像这样,我想学习如何使用UDF自定义Transformer.

def getFeatures(n: String) = {
    val NUMBER_FEATURES = 4  
    val name = n.split(" +")(0).toLowerCase
    ((1 to NUMBER_FEATURES)
         .filter(size => size <= name.length)
         .map(size => name.substring(name.length - size)))
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
Run Code Online (Sandbox Code Playgroud)

zer*_*323 15

它不是一个功能齐全的解决方案,但你可以从这样的事情开始:

import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

class NGramTokenizer(override val uid: String)
  extends UnaryTransformer[String, Seq[String], NGramTokenizer]  {

  def this() = this(Identifiable.randomUID("ngramtokenizer"))

  override protected def createTransformFunc: String => Seq[String] = {
    getFeatures _
  }

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType)
  }

  override protected def outputDataType: DataType = {
    new ArrayType(StringType, true)
  }
}
Run Code Online (Sandbox Code Playgroud)

快速检查:

val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")

transformer.transform(df).show
// +---+------+------------------+
// |  k|     v|                vs|
// +---+------+------------------+
// |  1|abcdef|[f, ef, def, cdef]|
// |  2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
Run Code Online (Sandbox Code Playgroud)

您甚至可以尝试将其概括为以下内容:

import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._

class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
  override val uid: String,
  f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]]  {

  override protected def createTransformFunc: T => U = f

  override protected def validateInputType(inputType: DataType): Unit = 
    require(inputType == schemaFor[T].dataType)

  override protected def outputDataType: DataType = schemaFor[U].dataType
}

val transformer = new UnaryUDFTransformer("featurize", getFeatures)
  .setInputCol("v")
  .setOutputCol("vs")
Run Code Online (Sandbox Code Playgroud)

如果你想使用UDF而不是包装函数,你必须Transformer直接扩展并覆盖transform方法.不幸的是,大多数有用的类都是私有的,所以它可能相当棘手.

或者,您可以注册UDF:

spark.udf.register("getFeatures", getFeatures _)
Run Code Online (Sandbox Code Playgroud)

并使用 SQLTransformer

import org.apache.spark.ml.feature.SQLTransformer

val transformer = new SQLTransformer()
  .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")

transformer.transform(df).show
// +---+------+------------------+
// |  k|     v|                vs|
// +---+------+------------------+
// |  1|abcdef|[f, ef, def, cdef]|
// |  2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
Run Code Online (Sandbox Code Playgroud)