ast*_*asz 5 scala user-defined-aggregate apache-spark apache-spark-sql
我想知道在什么情况下 Spark 将执行合并作为 UDAF 功能的一部分。
动机: 我在 Spark 项目中的一个窗口上使用了很多 UDAF 函数。我经常想回答这样的问题:
信用卡交易在 30 天内与当前交易在同一国家/地区进行了多少次?
该窗口将从当前事务开始,但不会将其包含在计数中。它需要当前交易的价值才能知道过去 30 天内要计算哪个国家。
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
Run Code Online (Sandbox Code Playgroud)
我写了我的 customUDAF 来进行计数。我总是使用.orderBy(orderByColumn.desc)并感谢.desc当前交易在计算过程中出现在窗口中的第一个。
UDAF 函数需要实现merge在并行计算中合并两个中间聚合缓冲区的函数。如果发生任何合并,current transaction不同缓冲区的my可能不相同,UDAF 的结果将不正确。
我编写了一个 UDAF 函数,该函数计算我的数据集上的合并次数,并仅保留窗口中的第一个事务以与当前事务进行比较。
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = ""
buffer(1) = 1
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getString(0) == "")
buffer(0) = input.getString(0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
def evaluate(buffer: Row) = buffer
}
Run Code Online (Sandbox Code Playgroud)
当我在具有 16 个 cpu 的本地主机上使用 spark 2.0.1 运行它时,永远不会有任何合并,并且窗口中的第一个事务始终是当前事务。这就是我要的。在不久的将来,我将在一个 x100 更大的数据集和真正的分布式 Spark 集群上运行我的代码,并想知道合并是否可以在那里发生。
问题:
UDAF 在什么情况/条件下进行合并?
merge当聚合函数的部分应用程序(“map 侧聚合”)在 shuffle(“reduce 侧聚合”)后合并时调用。
Windows 与 orderBy 曾经有过合并吗?
在目前的实施中从未。至于现在的窗口函数只是很奇特groupByKey,并且没有部分聚合。这当然是实现细节,将来可能会更改,恕不另行通知。
是否可以告诉 Spark 不要进行合并?
它不是。但是,如果数据已经按聚合键分区,则不需要merge并且仅combine使用。
最后:
在 30 天内,与当前交易相同的国家/地区进行了多少次信用卡交易?
不调用UDAFs或 窗口函数。我可能会使用 创建翻滚窗口o.a.s.sql.functions.window,按用户、国家/地区和窗口进行聚合,然后与输入连接回来。
| 归档时间: |
|
| 查看次数: |
1260 次 |
| 最近记录: |