为什么 Pyspark 作业在没有任何特定错误的情况下在过程中消失

Sid*_*idd 1 apache-spark apache-spark-sql pyspark

专家们,我注意到生产中的 Pyspark 作业之一(在 YARN 集群模式下运行)有一件奇怪的事情。执行大约一个小时 +(大约 65-75 分钟)后,它就会消失,而不会抛出任何特定的错误消息。我们已经分析了大约 2 周的 YARN 日志,其中没有特别的错误,它只是在执行 ETL 操作(读/写 hive 表、执行简单映射、修剪、lambda 操作等)时在中间死亡,没有任何错误要指出的特定代码段。有时重新运行可以修复它,有时需要多次重新运行。代码已优化, spark-submit --conf 具有所有正确优化的选项。正如我们之前提到的,它对于大约 30 种其他具有非常好的性能统计数据的应用程序来说绝对是完美的。这些是我们所有的选择——

spark-submit --conf spark.yarn.maxAppAttempts=1 --conf spark.sql.broadcastTimeout=36000 --conf spark.dynamicAllocation.executorIdleTimeout=1800 --conf spark.dynamicAllocation.minExecutors=8 --conf spark.dynamicAllocation.initialExecutors=8 --conf spark.dynamicAllocation.maxExecutors=32 --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.kryoserializer.buffer.max=512m --driver-memory 2G --executor-memory 8G --executor-cores 2 --deploy-mode cluster --master yarn
Run Code Online (Sandbox Code Playgroud)

我们想检查是否需要更改某些驱动器配置来解决此问题?或者在 Spark Cluster 模式下有一些可以增加的自动超时?我们在 Python 2.7 中使用 Spark 1.6

错误看起来像(有几条消息说 -

ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
Run Code Online (Sandbox Code Playgroud)

但是当它遇到驱动程序错误时它会失败(最终发生)-

ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down
Run Code Online (Sandbox Code Playgroud)

这是日志-

19/10/24 16:17:03 INFO compress.CodecPool: Got brand-new compressor [.gz]
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000323_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000323
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000323_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 323.0 in stage 152.0 (TID 27419). 2163 bytes result sent to driver
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000135_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000135
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000135_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 135.0 in stage 152.0 (TID 27387). 2163 bytes result sent to driver
19/10/24 16:18:04 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
19/10/24 16:18:04 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:18:04 INFO util.ShutdownHookManager: Shutdown hook called

19/10/24 16:21:12 INFO executor.Executor: Finished task 41.0 in stage 163.0 (TID 29954). 2210 bytes result sent to driver
19/10/24 16:21:12 INFO executor.Executor: Finished task 170.0 in stage 163.0 (TID 29986). 2210 bytes result sent to driver
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30047
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30079
19/10/24 16:21:13 INFO executor.Executor: Running task 10.0 in stage 165.0 (TID 30047)
19/10/24 16:21:13 INFO executor.Executor: Running task 42.0 in stage 165.0 (TID 30079)
19/10/24 16:21:13 INFO spark.MapOutputTrackerWorker: Updating epoch to 56 and clearing cache
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 210
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210_piece0 stored as bytes in memory (estimated size 29.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Reading broadcast variable 210 took 3 ms
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210 stored as values in memory (estimated size 83.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO executor.Executor: Finished task 10.0 in stage 165.0 (TID 30047). 931 bytes result sent to driver
19/10/24 16:21:13 INFO executor.Executor: Finished task 42.0 in stage 165.0 (TID 30079). 931 bytes result sent to driver
19/10/24 16:21:15 WARN executor.CoarseGrainedExecutorBackend: An unknown (rxxxxxx1.hadoop.com:XXXXX) driver disconnected.
19/10/24 16:21:15 ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down.
19/10/24 16:21:15 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:21:15 INFO util.ShutdownHookManager: Shutdown hook called
Run Code Online (Sandbox Code Playgroud)

谢谢,希德

Gsq*_*are 7

如果没有任何明显的堆栈跟踪,最好从两个角度思考问题:要么是代码问题,要么是数据问题

无论哪种情况,您都应该首先为驱动程序提供充足的内存,以排除可能的原因。增加driver.memorydriver.memoryOverhead直到您诊断出问题。

常见代码问题:

  1. 太多的转换会导致血统变得太大。如果数据帧上发生任何类型的迭代操作,那么最好通过checkpoint在两者之间执行 a 来截断 DAG 。在 Spark 2.x 中,您可以dataFrame.checkpoint()直接调用而不必访问RDD. @Sagar 的回答还描述了如何为 Spark 1.6 执行此操作

  2. 试图广播太大的数据帧。这通常会导致 OOM 异常,但有时只会导致作业看起来卡住了。broadcast如果您明确这样做,则解决方案是不调用。否则请检查您是否已设置spark.sql.autoBroadcastJoinThreshold为某个自定义值并尝试降低该值或完全禁用广播(设置-1)。

  3. 没有足够的分区会导致每个任务都运行得很热。诊断此问题的最简单方法是检查 Spark UI 上的阶段视图并查看每个任务读取和写入的数据大小。理想情况下,这应该在 100MB-500MB 范围内。否则增加spark.sql.shuffle.partitions并设置spark.default.parallelism为比默认值 200 更高的值。

常见数据问题:

  1. 数据倾斜。由于您的作业在特定工作负载下失败,因此特定作业中的数据可能会出现偏差。通过检查任务完成的中位时间是否与 75 个百分点相当,而后者与 Spark UI 中舞台视图上的 90 个百分点相当,以此来诊断这一点。纠正数据偏差的方法有很多,但我认为最好的方法是编写一个自定义的连接函数,在连接之前对连接键进行加盐处理。这以恒定大小的数据爆炸为代价将倾斜分区拆分为几个较小的分区。

  2. 输入文件格式或文件数。如果您的输入文件没有分区并且您只进行窄转换(那些不会导致数据混洗的转换),那么您的所有数据都将通过单个执行程序运行,而不会真正从分布式集群设置中受益。通过检查在管道的每个阶段创建了多少任务,从 Spark UI 诊断此问题。它应该是你的spark.default.parallelism价值的顺序。如果没有,则.repartition(<some value>)在任何转换之前的数据读取步骤之后立即执行。如果文件格式为 CSV(不理想),则multiLine除非在特定情况下需要,否则请验证您是否已禁用,否则这将强制单个执行程序读取整个 csv 文件。

调试愉快!