每个 Spark UDAF 都可以与 Window 一起使用吗?

Rap*_*oth 6 scala user-defined-aggregate dataframe apache-spark

我一直认为 Spark 不允许定义用户定义的窗口函数。我刚刚从这里测试了“几何平均值”UDAF 示例(https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html)作为窗口函数,它似乎工作得很好,例如:

val geomMean = new GeometricMean

(1 to 10).map(i=>
  (i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()

+---+----+------------------+
|  i|   x|         geom_mean|
+---+----+------------------+
|  1| 1.0|1.4142135623730951|
|  2| 2.0|1.8171205928321397|
|  3| 3.0|2.8844991406148166|
|  4| 4.0|3.9148676411688634|
|  5| 5.0|  4.93242414866094|
|  6| 6.0| 5.943921952763129|
|  7| 7.0| 6.952053289772898|
|  8| 8.0| 7.958114415792783|
|  9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+
Run Code Online (Sandbox Code Playgroud)

我从未见过 Spark 文档谈论使用 UDAF 作为窗口函数。这是允许的吗?即结果是否正确?顺便说一下我正在使用spark 2.1

编辑:

让我困惑的是,在标准聚合中(即后跟 a groupBy),数据总是添加到缓冲区中,即它们总是会增长,从不收缩。使用窗口函数(特别是与 结合使用rowsBetween()),数据还需要从缓冲区中删除,因为“旧”元素在沿着排序定义的行移动时会从窗口中删除。我认为窗口函数可以沿着状态的顺序移动。所以我认为必须有类似“删除”方法的东西要实现

ast*_*asz 4

我不确定你的问题到底是什么。

每个 Spark UDAF 都可以与 Window 一起使用吗?

是的

以下是我在这个主题上的个人经验:

我最近经常使用 Sparkwindow functionsUDAFs(Spark 2.0.1),我确认它们可以很好地协同工作。结果是正确的(假设您的 UDAF 编写正确)。编写 UDAF 有点麻烦,但是一旦你掌握了它,下一个的速度就会很快。

我没有测试所有这些,但内置的聚合函数org.apache.spark.sql.functions._也对我有用。在函数中搜索聚合。我主要使用一些经典聚合器,例如、、,它们都返回正确的值。sumcountavgstddev