yon*_*nil 6 scala apache-spark spark-dataframe
我的Spark应用程序是用Scala编写的.我正在写几个UDAF,它们对"矢量化"数据执行计算,其中每个值都是一个恒定大小(比如16)的双精度数组,而不是一个标量值.计算基于每个元素进行.我的目标是让UDAF尽可能高效地执行.为此,我希望双打在内存中连续出现,并且看到Spark的代码gen + JVM的JIT编译器将通过SIMD指令进行计算.
但是,似乎以直接的文档方式编写UDAF并使用标准DataFrame功能导致Spark为其Aggregator和Row对象生成非常低效的布局.数组中的数据到达我的代码,类型为WrappedArray [Double].它是Object []数组的代理,包含16个盒装双打.这不仅占实际原始数据的大约6-8倍的内存量,而且还使得不可能使用SIMD指令,因为双精度本身不在连续的存储器位置中.
例如,一个执行"矢量化和"的简单UDAF具有如下所示的更新函数:
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val acc = buffer.getAs[WrappedArray[Double]](0)
if (!input.isNullAt(0)) {
val value = input.getAs[WrappedArray[Double]](0)
for (i <- 0 until N) {
acc(i) += value(i)
}
buffer(0) = acc
}
}
Run Code Online (Sandbox Code Playgroud)
这是一个操作,在一个写得很好的本机程序中,看起来像这样:
void update(double* acc, const double* input) {
for(size_t i = 0; i != N; ++i) {
acc[i] += input[i]
}
}
Run Code Online (Sandbox Code Playgroud)
我的UDAF发生了什么?据我所知,只是我需要最后buffer(0) = acc一行存在(或累加器不更新)这意味着正在复制数组内容.因此,它首先创建大小为N(acc和value)的新的两个对象数组,将原始的盒装双打复制到它们中,然后+ =创建一个新的盒装Double,其中包含每个元素的结果,将它们放回到acc中,然后是数组acc被复制回缓冲区(0)中的数组.
这简直太糟糕了.我还没有运行任何分析,但我完全期望第二个代码片段运行速度比这快20-50倍.
必须有更好的方法来做事.我刚才读过关于"Project Tungsten"的内容,显然Spark可以使用非托管内存缓冲区运行 - 我不是Spark开发人员,所以我甚至不知道我是否可以将这些功能用于我的UDAF,如果是这样的话?有没有其他方法来至少消除拳击和无用的阵列复制?
编辑:示例输入和输出如下 -
case class TestInRow(id: Int, value: Array[Double])
// A UDAF that sums up arrays of 4 doubles
val sumVec = MyUDAFs.SumVec(4)
val inputDF = Seq(
TestInRow(0, Array[Double](1, 2, 5, 10)),
TestInRow(1, Array[Double](0, -1, 5, -12)),
TestInRow(2, Array[Double](0, 0, 5, 10)),
TestInRow(3, Array[Double](0, -1, 5, -8))
).toDF
val resultDF = inputDF.groupBy().agg(sumVec(col("value")) as "svec")
val output = resultDF.head.getAs[WrappedArray[Double]](0)
assert(output == Seq(1.0, 0.0, 20.0, 0.0))
Run Code Online (Sandbox Code Playgroud)
注意:在实际测试中我实际上是用这一行和它中的期望值创建一个expectedDF,然后通过加入一个关键元组(在这种情况下是一个空元组)来调用一个比较DF的函数,并检查所有的双打. "value"列匹配在容错范围内.
注意:这取自单元测试.在我的实际用例中,数组最初是由另一个包含标量的DF上的select(...)调用中的数组(expr*)构造形成的.像这样的东西:
df.select(
k1,
k2,
k3,
array(
sum(when(expr(predicates(0)(k4, k5)), qty).otherwise(lit(0.0))),
sum(when(expr(predicates(1)(k4, k5)), qty).otherwise(lit(0.0))),
sum(when(expr(predicates(2)(k4, k5)), qty).otherwise(lit(0.0))),
...
sum(when(expr(predicates(N-1)(k4, k5)), qty).otherwise(lit(0.0)))
) as "packed_qty"
)
Run Code Online (Sandbox Code Playgroud)
然后我想与vectorizedSum(packed_qty)聚合,例如在一个sql查询中,它在(k1,k2,k3)的几个组合上有GROUP BY ... GROUPING SETS(...).
还有其他几个UDAF,它们的工作原理相同:输入是N个双精度数组,它在每个元素的基础上对它们进行一些FP操作(或几个),并将结果聚合到数组中(s )大小为N.
| 归档时间: |
|
| 查看次数: |
258 次 |
| 最近记录: |