标签: user-defined-aggregate

SQL用户定义的值的聚合顺序是否保留?

我使用此MSDN页面中的代码创建用户定义的聚合以group by's在SQL Server中连接字符串.我的一个要求是连接值的顺序与查询中的顺序相同.例如:

Value   Group
1       1
2       1
3       2
4       2
Run Code Online (Sandbox Code Playgroud)

使用查询

SELECT
  dbo.Concat(tbl.Value) As Concat,
  tbl.Group
FROM
  (SELECT TOP 1000
     tblTest.*
  FROM 
    tblTest
  ORDER BY 
    tblTest.Value) As tbl
GROUP BY
  tbl.Group
Run Code Online (Sandbox Code Playgroud)

会导致:

Concat  Group
"1,2"   1
"3,4"   2
Run Code Online (Sandbox Code Playgroud)

结果似乎总是正确和正如预期的那样,但是我遇到的这个页面表明订单不能保证,并且该属性SqlUserDefinedAggregateAttribute.IsInvariantToOrder仅保留供将来使用.

所以我的问题是:假设字符串中的连接值可以以任何顺序结束是否正确?
如果是这种情况,为什么MSDN页面上的示例代码使用该IsInvariantToOrder属性?

c# sql clr user-defined-aggregate sql-server-2008

12
推荐指数
1
解决办法
1465
查看次数

为什么Mutable映射在Spark中的UserDefinedAggregateFunction(UDAF)中自动变为不可变

我试图在Spark中定义UserDefinedAggregateFunction(UDAF),它计算组的列中每个唯一值的出现次数.

这是一个例子:假设我有一个df这样的数据帧,

+----+----+
|col1|col2|
+----+----+
|   a|  a1|
|   a|  a1|
|   a|  a2|
|   b|  b1|
|   b|  b2|
|   b|  b3|
|   b|  b1|
|   b|  b1|
+----+----+
Run Code Online (Sandbox Code Playgroud)

我将有一个UDAF DistinctValues

val func = new DistinctValues
Run Code Online (Sandbox Code Playgroud)

然后我将它应用于数据帧df

val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV"))
Run Code Online (Sandbox Code Playgroud)

我期待有这样的事情:

+----+--------------------------+
|col1|DV                        |
+----+--------------------------+
|   a|  Map(a1->2, a2->1)       |
|   b|  Map(b1->3, b2->1, b3->1)|
+----+--------------------------+
Run Code Online (Sandbox Code Playgroud)

所以我推出了像这样的UDAF,

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.MapType
import …
Run Code Online (Sandbox Code Playgroud)

scala mutable user-defined-aggregate apache-spark

7
推荐指数
1
解决办法
1569
查看次数

每个 Spark UDAF 都可以与 Window 一起使用吗?

我一直认为 Spark 不允许定义用户定义的窗口函数。我刚刚从这里测试了“几何平均值”UDAF 示例(https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html)作为窗口函数,它似乎工作得很好,例如:

val geomMean = new GeometricMean

(1 to 10).map(i=>
  (i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()

+---+----+------------------+
|  i|   x|         geom_mean|
+---+----+------------------+
|  1| 1.0|1.4142135623730951|
|  2| 2.0|1.8171205928321397|
|  3| 3.0|2.8844991406148166|
|  4| 4.0|3.9148676411688634|
|  5| 5.0|  4.93242414866094|
|  6| 6.0| 5.943921952763129|
|  7| 7.0| 6.952053289772898|
|  8| 8.0| 7.958114415792783|
|  9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+
Run Code Online (Sandbox Code Playgroud)

我从未见过 Spark 文档谈论使用 UDAF 作为窗口函数。这是允许的吗?即结果是否正确?顺便说一下我正在使用spark 2.1

编辑:

让我困惑的是,在标准聚合中(即后跟 a groupBy),数据总是添加到缓冲区中,即它们总是会增长,从不收缩。使用窗口函数(特别是与 结合使用rowsBetween()),数据还需要从缓冲区中删除,因为“旧”元素在沿着排序定义的行移动时会从窗口中删除。我认为窗口函数可以沿着状态的顺序移动。所以我认为必须有类似“删除”方法的东西要实现

scala user-defined-aggregate dataframe apache-spark

6
推荐指数
1
解决办法
1852
查看次数

Spark 中的用户定义聚合函数 UDAF 何时发生合并

我想知道在什么情况下 Spark 将执行合并作为 UDAF 功能的一部分。

动机: 我在 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题:

信用卡交易在 30 天内与当前交易在同一国家/地区进行了多少次?

该窗口将从当前事务开始,但不会将其包含在计数中。它需要当前交易的价值才能知道过去 30 天内要计算哪个国家。

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)

df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))
Run Code Online (Sandbox Code Playgroud)

我写了我的 customUDAF 来进行计数。我总是使用.orderBy(orderByColumn.desc)并感谢.desc当前交易在计算过程中出现在窗口中的第一个。

UDAF 函数需要实现merge在并行计算中合并两个中间聚合缓冲区的函数。如果发生任何合并,current transaction不同缓冲区的my可能不相同,UDAF 的结果将不正确。

我编写了一个 UDAF 函数,该函数计算我的数据集上的合并次数,并仅保留窗口中的第一个事务以与当前事务进行比较。

 class FirstUDAF() extends UserDefinedAggregateFunction {

  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)

  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)

  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)

  def deterministic …
Run Code Online (Sandbox Code Playgroud)

scala user-defined-aggregate apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1260
查看次数

Spark Scala:用户定义的计算中位数的聚合函数

我试图找到一种方法来计算给定数据帧的中位数.

val df = sc.parallelize(Seq(("a",1.0),("a",2.0),("a",3.0),("b",6.0), ("b", 8.0))).toDF("col1", "col2")

+----+----+
|col1|col2|
+----+----+
|   a| 1.0|
|   a| 2.0|
|   a| 3.0|
|   b| 6.0|
|   b| 8.0|
+----+----+
Run Code Online (Sandbox Code Playgroud)

现在我想做那样的事情:
df.groupBy("col1").agg(calcmedian("col2"))

结果应如下所示:

+----+------+
|col1|median|
+----+------+
|   a|   2.0|
|   b|   7.0|
+----+------+` 
Run Code Online (Sandbox Code Playgroud)

因此calcmedian()必须是UDAF,但问题是,UDAF的"evaluate"方法只需要一行,但我需要整个表来对值进行排序并返回中位数...

// Once all entries for a group are exhausted, spark will evaluate to get the final result  
def evaluate(buffer: Row) = {...}
Run Code Online (Sandbox Code Playgroud)

这有可能吗?或者还有另一个不错的解决方法吗?我想强调,我知道如何计算"一组"数据集的中位数.但我不想在"foreach"循环中使用此算法,因为这是低效的!

谢谢!


编辑:

这是我到目前为止所尝试的:

object calcMedian extends UserDefinedAggregateFunction {
    // Schema you get as an input 
    def …
Run Code Online (Sandbox Code Playgroud)

scala group-by median user-defined-aggregate apache-spark

4
推荐指数
1
解决办法
2310
查看次数