火花内存不足

Igo*_*gor 10 scala apache-spark

我有一个包含150 G txt文件的文件夹(大约700个文件,平均每个200 MB).

我正在使用scala处理文件并最终计算一些聚合统计信息.我看到两种可行的方法:

  • 手动循环遍历所有文件,对每个文件进行计算并最终合并结果
  • 将整个文件夹读取到一个RDD,对此单个RDD执行所有操作,并让spark执行所有并行化

我倾向于第二种方法,因为它看起来更干净(不需要特定于并行化的代码),但我想知道我的方案是否适合我的硬件和数据所施加的限制.我有一个工作站,有16个线程和64 GB RAM可用(因此并行化将严格地在不同处理器核心之间本地化).我可能会在以后使用更多计算机扩展基础架构,但是现在我只想专注于调整这一个工作站场景的设置.

我正在使用的代码: - 读取TSV文件,并将有意义的数据提取到(String,String,String)三元组 - 然后执行一些过滤,映射和分组 - 最后,减少数据并计算一些聚合

我已经能够用一个单一的文件(〜200 MB的数据)来运行该代码,但是我收到java.lang.OutOfMemoryError:GC开销超过限制和/或Java进行添加更多的数据时,堆异常(在应用程序中断了6GB的数据,但我想将它与150 GB的数据一起使用).

我想我必须调整一些参数才能使其工作.我将不胜感激任何有关如何解决此问题的提示(如何调试内存需求).我已经尝试增加'spark.executor.memory'并使用较少数量的内核(理性的是每个内核需要一些堆空间),但这并没有解决我的问题.

我不需要解决方案非常快(如果需要,它可以轻松运行几个小时甚至几天).我也没有缓存任何数据,但最后只是将它们保存到文件系统中.如果您认为使用手动并行化方法更可行,我也可以这样做.

小智 4

我和我的团队已在 5 台机器(每台 32GB RAM)上成功处理了大小超过 1 TB 的 csv 数据。这在很大程度上取决于您正在执行何种处理以及如何处理。

  1. 如果您对 RDD 进行重新分区,则需要额外的计算,其开销超出了堆大小,请尝试通过减少 和 TextInputFormat.SPLIT_MINSIZETextInputFormat.SPLIT_MAXSIZE 如果您使用的是 TextInputFormat)的分割大小来加载具有更多并行性的文件,以提高并行性级别。

  2. 尝试使用mapPartition而不是map,这样您就可以处理分区内的计算。如果计算使用临时变量或实例,并且您仍然面临内存不足的问题,请尝试减少每个分区的数据数量(增加分区数量)

  3. 创建 Spark Context 之前,在 Spark 配置中使用“spark.executor.memory”和“spark.driver.memory”增加驱动程序内存和执行程序内存限制

请注意,Spark 是一个通用的集群计算系统,因此在单机上使用 Spark 效率很低(恕我直言)