我使用此MSDN页面中的代码创建用户定义的聚合以group by's
在SQL Server中连接字符串.我的一个要求是连接值的顺序与查询中的顺序相同.例如:
Value Group
1 1
2 1
3 2
4 2
Run Code Online (Sandbox Code Playgroud)
使用查询
SELECT
dbo.Concat(tbl.Value) As Concat,
tbl.Group
FROM
(SELECT TOP 1000
tblTest.*
FROM
tblTest
ORDER BY
tblTest.Value) As tbl
GROUP BY
tbl.Group
Run Code Online (Sandbox Code Playgroud)
会导致:
Concat Group
"1,2" 1
"3,4" 2
Run Code Online (Sandbox Code Playgroud)
结果似乎总是正确和正如预期的那样,但是我遇到的这个页面表明订单不能保证,并且该属性SqlUserDefinedAggregateAttribute.IsInvariantToOrder
仅保留供将来使用.
所以我的问题是:假设字符串中的连接值可以以任何顺序结束是否正确?
如果是这种情况,为什么MSDN页面上的示例代码使用该IsInvariantToOrder
属性?
我试图在Spark中定义UserDefinedAggregateFunction(UDAF),它计算组的列中每个唯一值的出现次数.
这是一个例子:假设我有一个df
这样的数据帧,
+----+----+
|col1|col2|
+----+----+
| a| a1|
| a| a1|
| a| a2|
| b| b1|
| b| b2|
| b| b3|
| b| b1|
| b| b1|
+----+----+
Run Code Online (Sandbox Code Playgroud)
我将有一个UDAF DistinctValues
val func = new DistinctValues
Run Code Online (Sandbox Code Playgroud)
然后我将它应用于数据帧df
val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV"))
Run Code Online (Sandbox Code Playgroud)
我期待有这样的事情:
+----+--------------------------+
|col1|DV |
+----+--------------------------+
| a| Map(a1->2, a2->1) |
| b| Map(b1->3, b2->1, b3->1)|
+----+--------------------------+
Run Code Online (Sandbox Code Playgroud)
所以我推出了像这样的UDAF,
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.MapType
import …
Run Code Online (Sandbox Code Playgroud) 我一直认为 Spark 不允许定义用户定义的窗口函数。我刚刚从这里测试了“几何平均值”UDAF 示例(https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html)作为窗口函数,它似乎工作得很好,例如:
val geomMean = new GeometricMean
(1 to 10).map(i=>
(i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()
+---+----+------------------+
| i| x| geom_mean|
+---+----+------------------+
| 1| 1.0|1.4142135623730951|
| 2| 2.0|1.8171205928321397|
| 3| 3.0|2.8844991406148166|
| 4| 4.0|3.9148676411688634|
| 5| 5.0| 4.93242414866094|
| 6| 6.0| 5.943921952763129|
| 7| 7.0| 6.952053289772898|
| 8| 8.0| 7.958114415792783|
| 9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+
Run Code Online (Sandbox Code Playgroud)
我从未见过 Spark 文档谈论使用 UDAF 作为窗口函数。这是允许的吗?即结果是否正确?顺便说一下我正在使用spark 2.1
编辑:
让我困惑的是,在标准聚合中(即后跟 a groupBy
),数据总是添加到缓冲区中,即它们总是会增长,从不收缩。使用窗口函数(特别是与 结合使用rowsBetween()
),数据还需要从缓冲区中删除,因为“旧”元素在沿着排序定义的行移动时会从窗口中删除。我认为窗口函数可以沿着状态的顺序移动。所以我认为必须有类似“删除”方法的东西要实现
我想知道在什么情况下 Spark 将执行合并作为 UDAF 功能的一部分。
动机: 我在 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题:
信用卡交易在 30 天内与当前交易在同一国家/地区进行了多少次?
该窗口将从当前事务开始,但不会将其包含在计数中。它需要当前交易的价值才能知道过去 30 天内要计算哪个国家。
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
Run Code Online (Sandbox Code Playgroud)
我写了我的 customUDAF 来进行计数。我总是使用.orderBy(orderByColumn.desc)
并感谢.desc
当前交易在计算过程中出现在窗口中的第一个。
UDAF 函数需要实现merge
在并行计算中合并两个中间聚合缓冲区的函数。如果发生任何合并,current transaction
不同缓冲区的my可能不相同,UDAF 的结果将不正确。
我编写了一个 UDAF 函数,该函数计算我的数据集上的合并次数,并仅保留窗口中的第一个事务以与当前事务进行比较。
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic …
Run Code Online (Sandbox Code Playgroud) 我试图找到一种方法来计算给定数据帧的中位数.
val df = sc.parallelize(Seq(("a",1.0),("a",2.0),("a",3.0),("b",6.0), ("b", 8.0))).toDF("col1", "col2")
+----+----+
|col1|col2|
+----+----+
| a| 1.0|
| a| 2.0|
| a| 3.0|
| b| 6.0|
| b| 8.0|
+----+----+
Run Code Online (Sandbox Code Playgroud)
现在我想做那样的事情:
df.groupBy("col1").agg(calcmedian("col2"))
结果应如下所示:
+----+------+
|col1|median|
+----+------+
| a| 2.0|
| b| 7.0|
+----+------+`
Run Code Online (Sandbox Code Playgroud)
因此calcmedian()必须是UDAF,但问题是,UDAF的"evaluate"方法只需要一行,但我需要整个表来对值进行排序并返回中位数...
// Once all entries for a group are exhausted, spark will evaluate to get the final result
def evaluate(buffer: Row) = {...}
Run Code Online (Sandbox Code Playgroud)
这有可能吗?或者还有另一个不错的解决方法吗?我想强调,我知道如何计算"一组"数据集的中位数.但我不想在"foreach"循环中使用此算法,因为这是低效的!
谢谢!
编辑:
这是我到目前为止所尝试的:
object calcMedian extends UserDefinedAggregateFunction {
// Schema you get as an input
def …
Run Code Online (Sandbox Code Playgroud)