Spark shuffle错误org.apache.spark.shuffle.FetchFailedException:FAILED_TO_UNCOMPRESS(5)

Non*_*one 5 apache-spark

我有一份处理大量数据的工作.此作业经常运行没有任何错误,但偶尔会抛出此错误.我正在使用Kyro Serializer.

我正在使用纱线集群运行Spark 1.2.0.

完整的堆栈跟踪:

org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:89)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
    at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
    at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
    at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
    at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
    at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
    at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
    at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
    at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
    at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1164)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:299)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Success.map(Try.scala:206)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:299)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
    ... 24 more
Run Code Online (Sandbox Code Playgroud)

小智 0

我认为最好使用其他压缩编解码器,例如 lz4。为此,请在 conf/spark-defaults.conf 中添加新行:spark.io.compression.codec lz4 将压缩编解码器从 snappy(默认)更改为 lz4 但是,此问题报告为错误,并已在Apache Jira:https://issues.apache.org/jira/browse/SPARK-4105