从Scala注册UDF到SqlContext以在PySpark中使用

And*_*rin 5 scala user-defined-functions apache-spark pyspark apache-zeppelin

是否可以注册用Scala编写的UDF(或函数)在PySpark中使用?例如:

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
Run Code Online (Sandbox Code Playgroud)

在Scala中,现在可以使用以下内容:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中使用"UDFaddOne"

%pyspark

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
Run Code Online (Sandbox Code Playgroud)

背景:我们是一个开发人员团队,一些用Scala编写,一些用Python编写,并且想分享已编写的函数.也可以将其保存到库中并导入它.

Ass*_*son 6

以下对我有用(基本上是多个地方的摘要,包括 zero323 提供的链接):

在斯卡拉:

package com.example
import org.apache.spark.sql.functions.udf

object udfObj extends Serializable {
  def createUDF = {
    udf((x: Int) => x + 1)
  }
}
Run Code Online (Sandbox Code Playgroud)

在python中(假设sc是spark上下文。如果您使用的是spark 2.0,则可以从spark会话中获取):

from py4j.java_gateway import java_import
from pyspark.sql.column import Column

jvm = sc._gateway.jvm
java_import(jvm, "com.example")
def udf_f(col):
    return Column(jvm.com.example.udfObj.createUDF().apply(col))
Run Code Online (Sandbox Code Playgroud)

当然,请确保使用 --jars 和 --driver-class-path 添加在 scala 中创建的 jar

那么这里会发生什么:

我们在可序列化对象中创建了一个函数,该函数在 Scala 中返回 udf(我不是 100% 确定需要 Serializable,对于更复杂的 UDF,我需要它,因此可能是因为它需要传递 java 对象)。

在 python 中,我们使用访问内部 jvm(这是一个私有成员,因此将来可以更改它,但我看不到它)并使用 java_import 导入我们的包。我们访问 createUDF 函数并调用它。这将创建一个具有 apply 方法的对象(Scala 中的函数实际上是具有 apply 方法的 java 对象)。apply 方法的输入是一列。应用列的结果是一个新列,所以我们需要用 Column 方法将它包装起来,使其可用于 withColumn。


zer*_*323 5

据我所知,PySpark没有提供任何等效callUDF功能,因此无法直接访问注册的UDF。

这里最简单的解决方案是使用原始SQL表达式:

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))

## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")

## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")
Run Code Online (Sandbox Code Playgroud)

这种方法相当有限,因此,如果需要支持更复杂的工作流程,则应构建一个程序包并提供完整的Python包装器。您将在我对Spark的回答中找到并举例说明UDAF包装器:如何使用Scala或Java用户定义函数映射Python?