我一直在试图弄清楚如何在循环遍历镶木地板文件和几个后处理函数时防止 Spark 因内存问题而崩溃。抱歉,文本泛滥,但这并不完全是一个特定的错误(我正在使用 PySpark。)如果这破坏了正确的堆栈溢出形式,请道歉!
基本伪代码为:
#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its "=" subdirectory
for counter in fileNums:
sparkDataFrame = sqlContext.read.parquet(counter)
summaryReportOne = sqlContext.sql.("SELECT.....")
summaryReportOne.write.partition("id").parquet("/")
summaryReportTwo = sqlContext.sql.("SELECT....")
summaryReportTwo.write.partition("id").parquet("/")
#several more queries, several involving joins, etc....
Run Code Online (Sandbox Code Playgroud)
此代码使用 spark SQL 查询,因此我未能成功创建包含所有 SQL 查询/函数的包装函数并将其传递给 foreach(不能将 sparkContext 或 sqlQuery 作为输入),而不是标准环形。
从技术上讲,这是一个具有分区的大型镶木地板文件,但一次读取并查询它的体积太大了;我需要在每个分区上运行这些功能。所以我只是在 PySpark 中运行一个常规的 python 循环,在每个循环中,我处理一个镶木地板分区(子目录)并编写相关的输出报告。
由于整个镶木地板文件的大小,不确定将所有代码包装在一个大的 mapPartition() 周围是否可行?
但是在几次循环之后,脚本由于内存错误而崩溃 - 特别是 Java 堆错误。(我已经确认循环崩溃的文件没有什么特别之处;它发生在第二个或第三个循环中读入的任何随机文件。)
Caused by: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: Java heap …Run Code Online (Sandbox Code Playgroud)