use*_*459 6 python scala apache-spark apache-spark-sql pyspark
我希望能够将Scala函数用作PySpark中的UDF
package com.test
object ScalaPySparkUDFs extends Serializable {
def testFunction1(x: Int): Int = { x * 2 }
def testUDFFunction1 = udf { x: Int => testFunction1(x) }
}
Run Code Online (Sandbox Code Playgroud)
我可以testFunction1
在PySpark 中访问它并返回值:
functions = sc._jvm.com.test.ScalaPySparkUDFs
functions.testFunction1(10)
Run Code Online (Sandbox Code Playgroud)
我想要做的就是将此函数用作UDF,最好是在withColumn
通话中使用:
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
Run Code Online (Sandbox Code Playgroud)
我认为这里有一个很有前途的方法: Spark:如何用Scala或Java用户定义函数映射Python?
但是,在对其中的代码进行更改时,可以改为使用testUDFFunction1
:
def udf_test(col):
sc = SparkContext._active_spark_context
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
return Column(_f(_to_seq(sc, [col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
我得到:
AttributeError: 'JavaMember' object has no attribute 'apply'
Run Code Online (Sandbox Code Playgroud)
我不明白这是因为我相信testUDFFunction1
有申请方法吗?
我不想使用在这里找到的类型的表达式:将 UDF从Scala注册到SqlContext,以便在PySpark中使用
任何有关如何使这项工作的建议,将不胜感激!
同意@user6910411,你必须直接在函数上调用apply方法。所以,你的代码将是。
Scala 中的 UDF:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
object ScalaPySparkUDFs {
def testFunction1(x: Int): Int = { x * 2 }
def getFun(): UserDefinedFunction = udf(testFunction1 _ )
}
Run Code Online (Sandbox Code Playgroud)
PySpark 代码:
def test_udf(col):
sc = spark.sparkContext
_test_udf = sc._jvm.com.test.ScalaPySparkUDFs.getFun()
return Column(_test_udf.apply(_to_seq(sc, [col], _to_java_column)))
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", test_udf(numbers['Value']))
Run Code Online (Sandbox Code Playgroud)
您链接的问题是使用 Scala object
。Scalaobject
是单例的,你可以apply
直接使用方法。
这里你使用一个 nullary 函数,它返回一个UserDefinedFunction
co 类的对象,你必须首先调用该函数:
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3331 次 |
最近记录: |