Apache Spark OutOfMemoryError(HeapSpace)

Tw *_*Nus 5 apache-spark apache-spark-sql pyspark pyspark-sql

我有一个大约5M行x20列的数据集,包含groupID和rowID.我的目标是检查(某些)列是否包含组内缺少(空)值的固定分数(例如,50%).如果找到,则该组的整个列设置为missing(null).

df = spark.read.parquet('path/to/parquet/')
check_columns = {'col1': ..., 'col2': ..., ...}  # currently len(check_columns) = 8

for col, _ in check_columns.items():
    total = (df
             .groupBy('groupID').count()
             .toDF('groupID', 'n_total')
             )

    missing = (df
               .where(F.col(col).isNull())
               .groupBy('groupID').count()
               .toDF('groupID', 'n_missing')
               )
    # count_missing = count_missing.persist()  # PERSIST TRY 1
    # print('col {} found {} missing'.format(col, missing.count()))  # missing.count() is b/w 1k-5k

    poor_df = (total
               .join(missing, 'groupID')
               .withColumn('freq', F.col('n_missing') / F.col('n_total'))
               .where(F.col('freq') > 0.5)
               .select('groupID')
               .toDF('poor_groupID')
               )

    df = (df
          .join(poor_df, df['groupID'] == poor_df['poor_groupID'], 'left_outer')
          .withColumn(col, (F.when(F.col('poor_groupID').isNotNull(), None)
                            .otherwise(df[col])
                            )
                    )
        .select(df.columns)
        )

    stats = (missing
             .withColumnRenamed('n_missing', 'cnt')
             .collect()  # FAIL 1
             )

    # df = df.persist()  # PERSIST TRY 2

print(df.count())  # FAIL 2
Run Code Online (Sandbox Code Playgroud)

我最初分配1G spark.driver.memory和4G spark.executor.memory,最终增加到spark.driver.memory10G.

问题: 循环在第一次迭代期间像魅力一样运行,但到最后,在第6或第7次迭代时,我看到我的CPU利用率下降(使用1而不是6个核心).与此同时,一次迭代的执行时间显着增加.在某些时候,我得到一个OutOfMemory错误:

  • spark.driver.memory < 4G:at collect()(FAIL 1)
  • 4G <= spark.driver.memory < 10G:在count()步骤(FAIL 2)

FAIL 1案例的堆栈跟踪(相关部分):

[...]
py4j.protocol.Py4JJavaError: An error occurred while calling o1061.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
[...]
Run Code Online (Sandbox Code Playgroud)

执行程序UI不反映过多的内存使用情况(它显示驱动程序使用的内存<50k,执行程序<1G).Spark metrics system(app-XXX.driver.BlockManager.memory.memUsed_MB)也没有:它显示600M到1200M的已用内存,但总是> 300M剩余内存.(这表明2G驱动程序内存应该这样做,但事实并非如此.)

首先处理哪个列也无关紧要(因为它是a上的循环dict(),它可以是任意顺序).

我的问题是:

  • 导致OutOfMemory错误的原因是什么并非所有可用的CPU核心都用于最终?
  • spark.driver.memory当我从执行器转移到驱动程序只需几KB时,为什么需要10G ?

一些(一般)问题,以确保我理解正确的事情:

  • 如果我收到OOM错误,正确看的地方几乎总是驱动程序(b/c执行程序溢出到磁盘)?
  • 为什么会count()导致OOM错误 - 我认为这个操作只会占用exector上的资源(向驱动程序提供几个字节)?
  • 上面提到的内存指标(指标系统,UI)是否正确?

BTW:我在独立模式下运行Spark 2.1.0.

更新2017-04-28

为了进一步深入,我为驱动程序启用了堆转储:

cfg = SparkConfig()
cfg.set('spark.driver.extraJavaOptions', '-XX:+HeapDumpOnOutOfMemoryError')
Run Code Online (Sandbox Code Playgroud)

我跑了它8Gspark.driver.memory和我分析与Eclipse MAT堆转储.事实证明,有两个相当大的类(每个~4G):

java.lang.Thread
    - char (2G)
    - scala.collection.IndexedSeqLike
        - scala.collection.mutable.WrappedArray (1G)
    - java.lang.String (1G)

org.apache.spark.sql.execution.ui.SQLListener
    - org.apache.spark.sql.execution.ui.SQLExecutionUIData 
      (various of up to 1G in size)
        - java.lang.String
    - ...
Run Code Online (Sandbox Code Playgroud)

我尝试关闭UI,使用

cfg.set('spark.ui.enabled', 'false')
Run Code Online (Sandbox Code Playgroud)

这使得UI不可用,但对OOM错误没有帮助.此外,我尝试使用UI来保持较少的历史记录

cfg.set('spark.ui.retainedJobs', '1')
cfg.set('spark.ui.retainedStages', '1')
cfg.set('spark.ui.retainedTasks', '1')
cfg.set('spark.sql.ui.retainedExecutions', '1')
cfg.set('spark.ui.retainedDeadExecutors', '1')
Run Code Online (Sandbox Code Playgroud)

这也没有帮助.

更新2017-05-18

我发现了Spark的pyspark.sql.DataFrame.checkpoint方法.这就像是persist摆脱了数据帧的血统.因此,它有助于规避上述问题.