Dan*_*don 8 scala apache-spark apache-spark-1.3
我正在运行Spark作业来聚合数据.我有一个名为Profile的自定义数据结构,它基本上包含一个mutable.HashMap[Zone, Double]
.我想使用以下代码合并共享给定密钥(UUID)的所有配置文件:
def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
.aggregateByKey(new Profile(), 3200)(merge, merge).cache()
Run Code Online (Sandbox Code Playgroud)
奇怪的是,Spark失败并出现以下错误:
org.apache.spark.SparkException:作业因阶段失败而中止:116318任务的序列化结果总大小(1024.0 MB)大于spark.driver.maxResultSize(1024.0 MB)
显而易见的解决方案是增加"spark.driver.maxResultSize",但有两件事让我困惑.
take()
或者collect()
),但我没有把任何东西带到驱动程序,只是从HDFS读取,聚合,保存回HDFS.有谁知道我为什么会收到这个错误?
是的,它失败了,因为我们在异常消息中看到的值按一位精度四舍五入,并且比较以字节为单位。
该序列化输出必须大于 1024.0 MB 且小于 1024.1 MB。
检查添加的 Apache Spark 代码片段,非常有趣,而且很少出现此错误。:)
这里totalResultSize > maxResultSize
两者都是 Long 类型,并且 in 保存以字节为单位的值。但msg
保留 的四舍五入值 Utils.bytesToString()
。
//TaskSetManager.scala
def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
s"(${Utils.bytesToString(maxResultSize)})"
logError(msg)
abort(msg)
false
} else {
true
}
}
Run Code Online (Sandbox Code Playgroud)
//Utils.scala
def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10
val (value, unit) = {
if (size >= 2*TB) {
(size.asInstanceOf[Double] / TB, "TB")
} else if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
} else if (size >= 2*MB) {
(size.asInstanceOf[Double] / MB, "MB")
} else if (size >= 2*KB) {
(size.asInstanceOf[Double] / KB, "KB")
} else {
(size.asInstanceOf[Double], "B")
}
}
"%.1f %s".formatLocal(Locale.US, value, unit)
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2158 次 |
最近记录: |