如何使用转换高阶函数?

Mit*_*aJ9 6 apache-spark apache-spark-sql

这是关于transform高阶函数(https://issues.apache.org/jira/browse/SPARK-23908).

有没有办法将它用作标准功能(在包中org.apache.spark.sql.functions._)?

我有一个字符串数组,我想对每个字符串应用URI规范化.现在我用UDF做了.我刚刚用spark 2.4.0跳过它,我可以跳过UDF.

当我看到它应该使用selectExpr类似df.selectExpr("transform(i, x -> x + 1)"),但它只是为了与使用selectExpr

无论如何使用它来提供转换的自定义功能?有没有办法实现它,还是应该使用好的旧UDF?

104*_*ica 7

无论如何使用它作为org.apache.spark.sql.functions._包中的标准函数?

目前它只适用于SQL表达式,但如果你想返回Column你的用途expr:

org.apache.spark.sql.functions._

expr("transform(i, x -> x + 1)"): Column
Run Code Online (Sandbox Code Playgroud)

无论如何使用它来提供转换的自定义功能?

可以使用Scala UDF*:

spark.udf.register("f", (x: Int) => x + 1)

Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
  .withColumn("xsinc", expr("transform(xs, x -> f(x))"))
  .show
Run Code Online (Sandbox Code Playgroud)
+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+
Run Code Online (Sandbox Code Playgroud)

虽然它似乎没有比UDF提供任何真正的好处Seq.


*对Python UDF的部分支持似乎已经到位(udfs被识别,类型被正确派生,调用也被调度),但是从2.4.0开始,序列化机制似乎被打破(所有记录都传递给UDF) as None):

from typing import Optional
from pyspark.sql.functions import expr

sc.version
Run Code Online (Sandbox Code Playgroud)
'2.4.0'
Run Code Online (Sandbox Code Playgroud)
def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", expr("transform(xs, x -> f(x))")))

df.printSchema()
Run Code Online (Sandbox Code Playgroud)
root
 |-- id: long (nullable = true)
 |-- xs: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- xsinc: array (nullable = true)
 |    |-- element: integer (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
df.show()
Run Code Online (Sandbox Code Playgroud)
+---+---------+-----+
| id|       xs|xsinc|
+---+---------+-----+
|  1|[1, 2, 3]| [,,]|
+---+---------+-----+
Run Code Online (Sandbox Code Playgroud)

当然,这里没有真正的性能提升潜力 - 它的调度BasePythonRunner应该与普通的开销相同udf.