我们使用 Apache Spark 2.1.1 生成一些每日报告。这些报告是根据一些日常数据生成的,我们在分别对每个单元运行报告并将它们合并在一起之前保留这些数据。这是我们正在做的事情的简化版本:
def unitReport(d: Date, df: DataFrame, u: String): DataFrame = ... // Builds a report based on unit `u`
val date: Date = ... // Date to run the report
val dailyData: DataFrame = someDailyData.persist() // Daily data
val units: Seq[String] = Seq("Unit_A", "Unit_B", "Unit_C")
val report: DataFrame =
units.map(unitReport(date, dailyData, _)) // Report for each unit.
.reduce((a, b) => a.union(b)) // Join all the units together.
Run Code Online (Sandbox Code Playgroud)
之后,我们将报告以 csv 格式写入 HDFS,将各部分连接在一起,然后通过电子邮件发送报告。
我们开始在处理这些报告中最大的一份(大约有五十个单位)时遇到问题。我们不断提高最大结果大小(现在为 10G)以及驱动器内存,并不断提高它。这里令人困惑的是 a) 我们不会将结果返回给驱动程序 b) …