Mit*_*aJ9 6 apache-spark apache-spark-sql
这是关于transform高阶函数(https://issues.apache.org/jira/browse/SPARK-23908).
有没有办法将它用作标准功能(在包中org.apache.spark.sql.functions._)?
我有一个字符串数组,我想对每个字符串应用URI规范化.现在我用UDF做了.我刚刚用spark 2.4.0跳过它,我可以跳过UDF.
当我看到它应该使用selectExpr类似df.selectExpr("transform(i, x -> x + 1)"),但它只是为了与使用selectExpr?
无论如何使用它来提供转换的自定义功能?有没有办法实现它,还是应该使用好的旧UDF?
无论如何使用它作为org.apache.spark.sql.functions._包中的标准函数?
目前它只适用于SQL表达式,但如果你想返回Column你的用途expr:
org.apache.spark.sql.functions._
expr("transform(i, x -> x + 1)"): Column
Run Code Online (Sandbox Code Playgroud)
无论如何使用它来提供转换的自定义功能?
可以使用Scala UDF*:
spark.udf.register("f", (x: Int) => x + 1)
Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
.withColumn("xsinc", expr("transform(xs, x -> f(x))"))
.show
Run Code Online (Sandbox Code Playgroud)
+---+---------+---------+
| id| xs| xsinc|
+---+---------+---------+
| 1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+
Run Code Online (Sandbox Code Playgroud)
虽然它似乎没有比UDF提供任何真正的好处Seq.
*对Python UDF的部分支持似乎已经到位(udfs被识别,类型被正确派生,调用也被调度),但是从2.4.0开始,序列化机制似乎被打破(所有记录都传递给UDF) as None):
from typing import Optional
from pyspark.sql.functions import expr
sc.version
Run Code Online (Sandbox Code Playgroud)
'2.4.0'
Run Code Online (Sandbox Code Playgroud)
def f(x: Optional[int]) -> Optional[int]:
return x + 1 if x is not None else None
spark.udf.register('f', f, "integer")
df = (spark
.createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
.withColumn("xsinc", expr("transform(xs, x -> f(x))")))
df.printSchema()
Run Code Online (Sandbox Code Playgroud)
root
|-- id: long (nullable = true)
|-- xs: array (nullable = true)
| |-- element: long (containsNull = true)
|-- xsinc: array (nullable = true)
| |-- element: integer (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
df.show()
Run Code Online (Sandbox Code Playgroud)
+---+---------+-----+
| id| xs|xsinc|
+---+---------+-----+
| 1|[1, 2, 3]| [,,]|
+---+---------+-----+
Run Code Online (Sandbox Code Playgroud)
当然,这里没有真正的性能提升潜力 - 它的调度BasePythonRunner应该与普通的开销相同udf.