`maxResultSize` 包含什么以及所有驱动程序内存在哪里

kap*_*nga 2 scala scalability internals apache-spark

我们使用 Apache Spark 2.1.1 生成一些每日报告。这些报告是根据一些日常数据生成的,我们在分别对每个单元运行报告并将它们合并在一起之前保留这些数据。这是我们正在做的事情的简化版本:

def unitReport(d: Date, df: DataFrame, u: String): DataFrame = ... // Builds a report based on unit `u`
val date: Date = ... // Date to run the report
val dailyData: DataFrame = someDailyData.persist()  // Daily data

val units: Seq[String] = Seq("Unit_A", "Unit_B", "Unit_C")
val report: DataFrame =
  units.map(unitReport(date, dailyData, _)) // Report for each unit.
    .reduce((a, b) => a.union(b))           // Join all the units together.
Run Code Online (Sandbox Code Playgroud)

之后,我们将报告以 csv 格式写入 HDFS,将各部分连接在一起,然后通过电子邮件发送报告。

我们开始在处理这些报告中最大的一份(大约有五十个单位)时遇到问题。我们不断提高最大结果大小(现在为 10G)以及驱动器内存,并不断提高它。这里令人困惑的是 a) 我们不会将结果返回给驱动程序 b) 最终输出的报告仅占用 CSV 形式的 145k 和 1298 行,为什么我们要传递 8G 的内容maxResultSize?我们觉得对于 Spark 如何管理内存、到底包含什么resultSize以及发送回驱动程序的内容有一些我们不了解的地方,但很难找到任何解释或文档。以下是报告最后阶段的片段,就在内存开始耗尽之前,让您了解报告的复杂性:

[Stage 2297:===========================================>    (4822 + 412) / 5316]
[Stage 2297:===========================================>    (4848 + 394) / 5316]
[Stage 2297:============================================>   (4877 + 370) / 5316]
[Stage 2297:============================================>   (4909 + 343) / 5316]
[Stage 2297:============================================>   (4944 + 311) / 5316]
[Stage 2297:============================================>   (4964 + 293) / 5316]
[Stage 2297:============================================>   (4980 + 278) / 5316]
[Stage 2297:=============================================>  (4996 + 266) / 5316]
[Stage 2297:=============================================>  (5018 + 246) / 5316]
Run Code Online (Sandbox Code Playgroud)

我们通过以下代码发现了我们认为的类似记忆效应:

import org.apache.spark.mllib.random.RandomRDDs._
val df = normalRDD(sc, 1000000000L, 1000000).toDF()
df.filter($"value" > 0.9).count()
Run Code Online (Sandbox Code Playgroud)

虽然此代码仅返回一个简单的计数,但当我们最终在驱动程序上遇到内存不足错误时:

java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:174)
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at scala.collection.generic.Growable$class.loop$1(Growable.scala:53)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:57)
Run Code Online (Sandbox Code Playgroud)

当我们监视驱动程序上的日志时,我们发现它不断进行完整的垃圾收集,并且总体内存不断增加:

2.095: [GC [PSYoungGen: 64512K->8399K(74752K)] 64512K->8407K(244224K), 0.0289150 secs] [Times: user=0.05 sys=0.02, real=0.02 secs] 
3.989: [GC [PSYoungGen: 72911K->10235K(139264K)] 72919K->10709K(308736K), 0.0257280 secs] [Times: user=0.04 sys=0.02, real=0.02 secs] 
5.936: [GC [PSYoungGen: 139259K->10231K(139264K)] 139733K->67362K(308736K), 0.0741340 secs] [Times: user=0.40 sys=0.12, real=0.07 secs] 
10.842: [GC [PSYoungGen: 139255K->10231K(268288K)] 196386K->86311K(437760K), 0.0678030 secs] [Times: user=0.28 sys=0.07, real=0.07 secs] 
19.282: [GC [PSYoungGen: 268279K->10236K(268288K)] 344359K->122829K(437760K), 0.0642890 secs] [Times: user=0.32 sys=0.10, real=0.06 secs] 
22.981: [GC [PSYoungGen: 268284K->30989K(289792K)] 380877K->143582K(459264K), 0.0811960 secs] [Times: user=0.20 sys=0.07, real=0.08 secs] 
Run Code Online (Sandbox Code Playgroud)

有人知道发生了什么事吗?任何解释或指向文档的指示将不胜感激。

Tza*_*har 6

很难确定,但我猜这与DataFrame 中的分区总数(即减少的结果)有关,并且您拥有的单位越多,该数字可能就越大,因为分区的数量ina.union(b)ab的分区计数之和。

虽然数据不存储在驱动程序上或发送到驱动程序,但驱动程序确实管理代表所有分区的对象以及分配给每个分区的任务;如果您的 DataFrame 最终有数百万个分区,Driver 将创建(然后使用 GC 收集)数百万个对象。

因此,尝试更改联合操作以包含coalesce限制分区总数的操作:

val MaxParts = dailyData.rdd.partitions.length * 2 // or anything, but something reasonable

val report: DataFrame =
  units.map(unitReport(date, dailyData, _))
    .reduce((a, b) => a.union(b).coalesce(MaxParts))
Run Code Online (Sandbox Code Playgroud)