在脚本中迭代/循环 Spark parquet 文件会导致内存错误/累积(使用 Spark SQL 查询)

kpl*_*ney 5 loops apache-spark apache-spark-sql pyspark pyspark-sql

我一直在试图弄清楚如何在循环遍历镶木地板文件和几个后处理函数时防止 Spark 因内存问题而崩溃。抱歉,文本泛滥,但这并不完全是一个特定的错误(我正在使用 PySpark。)如果这破坏了正确的堆栈溢出形式,请道歉!

基本伪代码为:

#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its  "=" subdirectory
for counter in fileNums:
  sparkDataFrame = sqlContext.read.parquet(counter)
  summaryReportOne = sqlContext.sql.("SELECT.....")
  summaryReportOne.write.partition("id").parquet("/")
  summaryReportTwo = sqlContext.sql.("SELECT....")
  summaryReportTwo.write.partition("id").parquet("/")
  #several more queries, several involving joins, etc....
Run Code Online (Sandbox Code Playgroud)

此代码使用 spark SQL 查询,因此我未能成功创建包含所有 SQL 查询/函数的包装函数并将其传递给 foreach(不能将 sparkContext 或 sqlQuery 作为输入),而不是标准环形。

从技术上讲,这是一个具有分区的大型镶木地板文件,但一次读取并查询它的体积太大了;我需要在每个分区上运行这些功能。所以我只是在 PySpark 中运行一个常规的 python 循环,在每个循环中,我处理一个镶木地板分区(子目录)并编写相关的输出报告。

由于整个镶木地板文件的大小,不确定将所有代码包装在一个大的 mapPartition() 周围是否可行?

但是在几次循环之后,脚本由于内存错误而崩溃 - 特别是 Java 堆错误。(我已经确认循环崩溃的文件没有什么特别之处;它发生在第二个或第三个循环中读入的任何随机文件。)

Caused by: com.google.protobuf.ServiceException:     
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at    org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space
Run Code Online (Sandbox Code Playgroud)

我意识到 Spark 并不是要循环运行,但是这些 SQL 查询对于标准的 Spark SQL 封装函数来说有点太复杂了,我们针对不同的聚合统计数据为每个文件编写了多个汇总报告。

有没有办法基本上清除每个循环索引末尾的内存?使用 sqlContext.dropTempTable() 删除任何注册的临时表并使用 sqlContext.clearCache() 清除缓存都没有帮助。如果我尝试停止 sparkContext 并在每个循环中重新启动它,我也会收到错误,因为某些进程尚未“结束”(似乎您曾经能够“优雅地”停止上下文,但我在当前的 PySpark 文档中找不到这个。)

我还应该注意,在处理完循环中的数据帧后,我没有对它们调用 unpersist(),但也没有对它们调用 persist();我只是重写每个循环中的数据帧(这可能是问题的一部分)。

我正在与我们的工程团队合作调整内存设置,但我们知道我们已经分配了足够多的内存来完成此脚本的一个循环(并且一个循环确实运行没有任何错误)。

任何建议都会有所帮助 - 包括可能比 Spark 更适合此用例的工具。我使用的是 Spark 1.6.1 版。

kpl*_*ney 2

更新:如果我在每个循环中完成 SQL 查询后对每个表调用 unpersist() ,那么循环可以成功地继续到下一次迭代,而不会出现内存问题。如上所述,.clearCache() 和单独删除临时表并不能解决问题。我猜这可行,因为虽然表来自 SparkSQL 查询,但它返回一个 RDD。

即使我没有对这些 RDD 调用 persist(),我也必须告诉 Spark 在下一个循环开始之前清除它们,以便我可以将新的 SQL 查询分配给这些相同的变量名称。