Alb*_*nto 9 scala user-defined-functions apache-spark apache-spark-sql apache-spark-ml
我试图用自定义阶段创建和保存管道.我需要使用一个添加column到我DataFrame的UDF.因此,我想知道是否有可能将一个UDF或类似的动作转换成一个Transformer?
我的自定义UDF看起来像这样,我想学习如何使用UDF自定义Transformer.
def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}
val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
Run Code Online (Sandbox Code Playgroud)
zer*_*323 15
它不是一个功能齐全的解决方案,但你可以从这样的事情开始:
import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}
class NGramTokenizer(override val uid: String)
extends UnaryTransformer[String, Seq[String], NGramTokenizer] {
def this() = this(Identifiable.randomUID("ngramtokenizer"))
override protected def createTransformFunc: String => Seq[String] = {
getFeatures _
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType)
}
override protected def outputDataType: DataType = {
new ArrayType(StringType, true)
}
}
Run Code Online (Sandbox Code Playgroud)
快速检查:
val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
Run Code Online (Sandbox Code Playgroud)
您甚至可以尝试将其概括为以下内容:
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._
class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
override val uid: String,
f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] {
override protected def createTransformFunc: T => U = f
override protected def validateInputType(inputType: DataType): Unit =
require(inputType == schemaFor[T].dataType)
override protected def outputDataType: DataType = schemaFor[U].dataType
}
val transformer = new UnaryUDFTransformer("featurize", getFeatures)
.setInputCol("v")
.setOutputCol("vs")
Run Code Online (Sandbox Code Playgroud)
如果你想使用UDF而不是包装函数,你必须Transformer直接扩展并覆盖transform方法.不幸的是,大多数有用的类都是私有的,所以它可能相当棘手.
或者,您可以注册UDF:
spark.udf.register("getFeatures", getFeatures _)
Run Code Online (Sandbox Code Playgroud)
并使用 SQLTransformer
import org.apache.spark.ml.feature.SQLTransformer
val transformer = new SQLTransformer()
.setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")
transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4217 次 |
| 最近记录: |