我正在以推测模式运行Spark工作.我有大约500个任务和大约500个压缩1 GB gz的文件.我继续参加每项工作,完成1-2项任务,附加错误,然后重新运行数十次(阻止工作完成).
org.apache.spark.shuffle.MetadataFetchFailedException:缺少shuffle 0的输出位置
知道问题的含义是什么以及如何克服它?
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Run Code Online (Sandbox Code Playgroud)
Jor*_*ren 43
当我向工作节点提供更多内存时,这发生在我身上.由于它没有交换,因此在尝试存储对象进行洗牌时火花崩溃而没有更多的内存.
解决方案是添加交换,或者将worker/executor配置为使用更少的内存以及使用MEMORY_AND_DISK存储级别来保留多个持久性.
kus*_*eep 21
当特定 Spark 分区中有大量数据时,就会出现该错误。解决这个问题的方法是执行以下步骤:
假设我们有以下数据,并且有 3 个执行器,每个执行器 1 个核心,因此本例中的分区(物理分区)数量为 3
Data: 1,2,3,4,5,6,7,8,9,13,16,19,22
Partitions: 1,2,3
Distribution of Data in Partitions (partition logic based on modulo by 3)
1-> 1,4,7,13,16,19,22
2-> 2,5,8
3->3,6,9
From above we can see that there is data skew, partition 1 is having more
data than the rest
Now lets increase the number of partitions to : number of executors * number
of cores per executor*2 = 6 (in our example. These 6 partitions will be
logical partitions.Now each executor will be having 2 logical partitions
instead of 1 .Data partitioning will be based on modulo 6 instead of 3.
Partitions of data in each executor:
1->(0,1)->1,6,7,13,19
2->(2,3)-->2,3,8,9
3->(4,5)->4,5,16,22
The increase in logical partitions leads to fair partitioning.
Run Code Online (Sandbox Code Playgroud)
增加 shuffle 分区数量后可以做的下一件事是,如果不持久或缓存任何数据帧,则减少 Spark 内存的存储部分。默认情况下,存储部分是 0.5 ,执行部分也是 0.5 。要减少存储部分,您可以在 Spark-submit 命令中设置以下配置
--conf spark.memory.storageFraction=0.3
Run Code Online (Sandbox Code Playgroud)
4.) 除了上述两件事之外,您还可以设置执行器开销内存。--conf Spark.executor.memoryOverhead=2g
This is off-heap memory that is used for Virtual Machine overheads, interned
strings etc.
Run Code Online (Sandbox Code Playgroud)
5.) 除此之外,您还可以通过将 maxFilesPerTrigger 设置为较小的值(例如 10)来限制特定微批次中处理的文件数量。
Not*_*ist 14
我们与Spark有类似的错误,但我不确定它与您的问题有关.
我们使用JavaPairRDD.repartitionAndSortWithinPartitions了100GB数据,并且它与您的应用程序类似地失败.然后我们查看了特定节点上的Yarn日志,发现我们遇到了某种内存不足问题,因此Yarn中断了执行.我们的解决方案是改变/添加spark.shuffle.memoryFraction 0在.../spark/conf/spark-defaults.conf.这使我们能够以这种方式处理更大量(但不幸的是无限量)的数据量.
xpl*_*dev 11
我在我的3台机器YARN集群上遇到了同样的问题.我一直在改变RAM,但问题仍然存在.最后,我在日志中看到以下消息:
17/02/20 13:11:02 WARN spark.HeartbeatReceiver: Removing executor 2 with no recent heartbeats: 1006275 ms exceeds timeout 1000000 ms
17/02/20 13:11:02 ERROR cluster.YarnScheduler: Lost executor 2 on 1worker.com: Executor heartbeat timed out after 1006275 ms
Run Code Online (Sandbox Code Playgroud)
在此之后,有这样的信息:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 67
Run Code Online (Sandbox Code Playgroud)
我修改了spark-defaults.conf中的属性,如下所示:
spark.yarn.scheduler.heartbeat.interval-ms 7200000
spark.executor.heartbeatInterval 7200000
spark.network.timeout 7200000
Run Code Online (Sandbox Code Playgroud)
而已!我的工作在此之后成功完成.
在Spark Web UI中,如果有类似的信息Executors lost,那么你必须检查yarn日志,确定你的容器是否已被杀死。
如果容器被杀死,则可能是由于内存不足。
如何找到纱线日志中的关键信息?例如,可能会出现一些这样的警告:
Container killed by YARN for exceeding memory limits. 2.5 GB of 2.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.
Run Code Online (Sandbox Code Playgroud)
在这种情况下,建议您应该增加spark.yarn.executor.memoryOverhead.
| 归档时间: |
|
| 查看次数: |
56164 次 |
| 最近记录: |