yon*_*nil 4 unit-testing scala user-defined-functions apache-spark apache-spark-sql
星火UDAFs需要您实现多种方法,特别是
def update(buffer: MutableAggregationBuffer, input: Row): Unit
和
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
假设我的测试中有一个UDAF X,4行(r0, r1, r2, r3)和两个聚合缓冲区A, B。我想看看这段代码产生了预期的结果:
X.update(A, r0)
X.update(A, r1)
X.update(B, r2)
X.update(B, r3)
X.merge(A, B)
X.evaluate(A)
Run Code Online (Sandbox Code Playgroud)
与仅使用一个缓冲区在4行中的每行上调用X.update相同:
X.update(A, r0)
X.update(A, r1)
X.update(A, r2)
X.update(A, r3)
X.evaluate(A)
Run Code Online (Sandbox Code Playgroud)
这样,可以测试两种方法的正确性。但是,我不知道如何编写这样的测试:用户代码似乎无法实例化的任何实现MutableAggregationBuffer。
如果仅从4行中创建一个DF,并尝试使用它groupBy().agg(...)来调用UDAF,Spark甚至不会尝试以这种特定方式合并它们-由于行数很少,因此不需要。
MutableAggregationBuffer只是一个抽象类。您可以轻松创建自己的实现,例如这样的实现:
import org.apache.spark.sql.expressions._
class DummyBuffer(init: Array[Any]) extends MutableAggregationBuffer {
val values: Array[Any] = init
def update(i: Int, value: Any) = values(i) = value
def get(i: Int): Any = values(i)
def length: Int = init.size
def copy() = new DummyBuffer(values)
}
Run Code Online (Sandbox Code Playgroud)
它不会替代“真实的东西”,但对于简单的测试场景而言应该足够了。
| 归档时间: |
|
| 查看次数: |
339 次 |
| 最近记录: |