Mat*_*att 5 scala distributed-computing lazy-evaluation apache-spark rdd
我想为我的 Spark 程序执行速度计时,但由于懒惰,这非常困难。让我们在这里考虑一下这个(无意义的)代码:
var graph = GraphLoader.edgeListFile(context, args(0))
val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache
/* I'd need to start the timer here */
val t1 = System.currentTimeMillis
val edges = graph_degs.flatMap(trip => { /* do something*/ })
.union(graph_degs)
val count = edges.count
val t2 = System.currentTimeMillis
/* I'd need to stop the timer here */
println("It took " + t2-t1 + " to count " + count)
Run Code Online (Sandbox Code Playgroud)
问题是,转换是懒惰的,所以val count = edges.count在行之前没有任何评估。但是根据我的观点,t1尽管上面的代码没有值,但还是得到了一个值……t1尽管在代码中的位置,但在计时器启动后对上面的代码进行了评估。那是个问题...
在 Spark Web UI 中,我找不到任何有趣的东西,因为我需要在该特定代码行之后花费时间。您认为是否有一个简单的解决方案可以查看何时对一组转换进行真实评估?
由于连续的变换(在同一内的任务-这意味着,它们不是由分开的混洗和作为相同的一部分执行的动作),为单一的“步骤”,火花不执行不单独地测量它们。从驱动程序代码 - 你也不能。
您可以做的是测量将您的函数应用于每条记录的持续时间,并使用累加器将其汇总,例如:
// create accumulator
val durationAccumulator = sc.longAccumulator("flatMapDuration")
// "wrap" your "doSomething" operation with time measurement, and add to accumulator
val edges = rdd.flatMap(trip => {
val t1 = System.currentTimeMillis
val result = doSomething(trip)
val t2 = System.currentTimeMillis
durationAccumulator.add(t2 - t1)
result
})
// perform the action that would trigger evaluation
val count = edges.count
// now you can read the accumulated value
println("It took " + durationAccumulator.value + " to flatMap " + count)
Run Code Online (Sandbox Code Playgroud)
您可以对任何单个转换重复此操作。
免责声明:
样式说明:您可以通过创建一个measure“包装”任何函数并更新给定累加器的函数来使此代码更具可重用性:
// write this once:
def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => {
val t1 = System.currentTimeMillis
val result = action(input)
val t2 = System.currentTimeMillis
acc.add(t2 - t1)
result
}
// use it with any transformation:
rdd.flatMap(measure(doSomething, durationAccumulator))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3503 次 |
| 最近记录: |