为什么Spark作业会因org.apache.spark.shuffle.MetadataFetchFailedException而失败:在推测模式下缺少shuffle 0的输出位置?

dot*_*tan 73 apache-spark

我正在以推测模式运行Spark工作.我有大约500个任务和大约500个压缩1 GB gz的文件.我继续参加每项工作,完成1-2项任务,附加错误,然后重新运行数十次(阻止工作完成).

org.apache.spark.shuffle.MetadataFetchFailedException:缺少shuffle 0的输出位置

知道问题的含义是什么以及如何克服它?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)
Run Code Online (Sandbox Code Playgroud)

Jor*_*ren 43

当我向工作节点提供更多内存时,这发生在我身上.由于它没有交换,因此在尝试存储对象进行洗牌时火花崩溃而没有更多的内存.

解决方案是添加交换,或者将worker/executor配置为使用更少的内存以及使用MEMORY_AND_DISK存储级别来保留多个持久性.

  • 嗨@Joren这不是比赛.OP问题是执行者没有足够的内存来存储随机输出.对您有用的不是减少执行程序内存,而是使用MEMORY_AND_DISK存储级别,这消除了执行程序的内存限制.OP也没有说明他对执行人有多少资源. (11认同)
  • 如果节点(内存)上有资源,可以尝试增加spark executor内存.如果你也关心性能,我会先尝试一下. (3认同)
  • 尝试将 spark.driver.overhead.memory 和 spark.executor.overhead.memory 更改为大于 384(默认)的值,它应该可以工作。您可以使用 1024 MB 或 2048 MB。 (3认同)

kus*_*eep 21

当特定 Spark 分区中有大量数据时,就会出现该错误。解决这个问题的方法是执行以下步骤:

  1. 增加 shuffle-partitions 的数量: --conf spark.sql.shuffle.partitions=<some-high-number-let say 200>
  2. 在正常情况下,分区数应设置为执行器数*每个执行器的核心数。但如果我们有大量数据,这种分区方案就会出现问题。请参阅下面的示例。

假设我们有以下数据,并且有 3 个执行器,每个执行器 1 个核心,因此本例中的分区(物理分区)数量为 3

 Data:  1,2,3,4,5,6,7,8,9,13,16,19,22

 Partitions:  1,2,3 
 Distribution of Data in Partitions (partition logic based on modulo by 3)

          1-> 1,4,7,13,16,19,22
          2-> 2,5,8
          3->3,6,9
 From above we can see that there is data skew, partition 1 is having more 
 data than the rest
 
 Now lets increase the number of partitions to : number of executors * number 
 of cores per executor*2 = 6 (in our example. These 6 partitions will be 
 logical partitions.Now each executor will be having 2 logical partitions 
 instead of 1 .Data partitioning will be based on modulo 6 instead of 3.

 Partitions of data in each executor:

        1->(0,1)->1,6,7,13,19
        2->(2,3)-->2,3,8,9
        3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.
Run Code Online (Sandbox Code Playgroud)
  1. 增加 shuffle 分区数量后可以做的下一件事是,如果不持久或缓存任何数据帧,则减少 Spark 内存的存储部分。默认情况下,存储部分是 0.5 ,执行部分也是 0.5 。要减少存储部分,您可以在 Spark-submit 命令中设置以下配置

        --conf spark.memory.storageFraction=0.3
    
    Run Code Online (Sandbox Code Playgroud)

4.) 除了上述两件事之外,您还可以设置执行器开销内存。--conf Spark.executor.memoryOverhead=2g

 This is off-heap memory that is used for Virtual Machine overheads, interned 
 strings etc.
Run Code Online (Sandbox Code Playgroud)

5.) 除此之外,您还可以通过将 maxFilesPerTrigger 设置为较小的值(例如 10)来限制特定微批次中处理的文件数量。

  • 当 200 是默认值时,为什么要说“some-high-number-let say 200”?https://spark.apache.org/docs/latest/sql-performance-tuning.html (2认同)

Not*_*ist 14

我们与Spark有类似的错误,但我不确定它与您的问题有关.

我们使用JavaPairRDD.repartitionAndSortWithinPartitions了100GB数据,并且它与您的应用程序类似地失败.然后我们查看了特定节点上的Yarn日志,发现我们遇到了某种内存不足问题,因此Yarn中断了执行.我们的解决方案是改变/添加spark.shuffle.memoryFraction 0.../spark/conf/spark-defaults.conf.这使我们能够以这种方式处理更大量(但不幸的是无限量)的数据量.


xpl*_*dev 11

我在我的3台机器YARN集群上遇到了同样的问题.我一直在改变RAM,但问题仍然存在.最后,我在日志中看到以下消息:

17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms
Run Code Online (Sandbox Code Playgroud)

在此之后,有这样的信息:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67
Run Code Online (Sandbox Code Playgroud)

我修改了spark-defaults.conf中的属性,如下所示:

spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000
Run Code Online (Sandbox Code Playgroud)

而已!我的工作在此之后成功完成.

  • 在 spark 文档中,它说:`spark.executor.heartbeatInterval 应该明显小于 spark.network.timeout`。因此,将这两个设置为相同的值可能不是最好的主意。 (2认同)

Den*_*sLi 5

在Spark Web UI中,如果有类似的信息Executors lost,那么你必须检查yarn日志,确定你的容器是否已被杀死。

如果容器被杀死,则可能是由于内存不足。

如何找到纱线日志中的关键信息?例如,可能会出现一些这样的警告:

Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Run Code Online (Sandbox Code Playgroud)

在这种情况下,建议您应该增加spark.yarn.executor.memoryOverhead.