相关疑难解决方法(0)

为什么Spark作业会因org.apache.spark.shuffle.MetadataFetchFailedException而失败:在推测模式下缺少shuffle 0的输出位置?

我正在以推测模式运行Spark工作.我有大约500个任务和大约500个压缩1 GB gz的文件.我继续参加每项工作,完成1-2项任务,附加错误,然后重新运行数十次(阻止工作完成).

org.apache.spark.shuffle.MetadataFetchFailedException:缺少shuffle 0的输出位置

知道问题的含义是什么以及如何克服它?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
    at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
    at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
    at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)
Run Code Online (Sandbox Code Playgroud)

apache-spark

73
推荐指数
5
解决办法
6万
查看次数

spark.sql.shuffle.partitions的最佳值应该是什么,或者在使用Spark SQL时如何增加分区?

嗨,我实际上使用Spark SQL hiveContext.sql(),它使用查询组,我遇到了OOM问题.因此,考虑将spark.sql.shuffle.partitions200的默认值增加到1000,但它没有帮助.请纠正我,如果我错了,这个分区将共享数据shuffle load,所以分区更少数据保持.请指导我是Spark新手.我正在使用Spark 1.4.0,我有大约1TB的未压缩数据,可以使用hiveContext.sql()group by queries 进行处理.

apache-spark apache-spark-sql

36
推荐指数
2
解决办法
3万
查看次数

如何避免Spark执行器丢失和纱线容器由于内存限制而导致它被杀死?

我有以下代码,hiveContext.sql()大部分时间都会触发.我的任务是我想创建几个表并在处理完所有hive表分区后插入值.

所以我首先show partitions在for循环中触发并使用它的输出,我调用一些创建表的方法(如果它不存在)并使用它插入它们hiveContext.sql.

现在,我们不能hiveContext在执行程序中执行,所以我必须在驱动程序的for循环中执行它,并且应该逐个串行运行.当我在YARN集群中提交这个Spark作业时,几乎所有的时间我的执行程序都因为shuffle未找到异常而丢失.

现在这种情况正在发生,因为YARN因为内存过载而杀死了我的执行程序.我不明白为什么,因为我为每个hive分区设置了一个非常小的数据集,但它仍然导致YARN杀死我的执行程序.

以下代码是否会并行执行所有操作并尝试同时容纳内存中的所有hive分区数据?

public static void main(String[] args) throws IOException {   
    SparkConf conf = new SparkConf(); 
    SparkContext sc = new SparkContext(conf); 
    HiveContext hc = new HiveContext(sc); 

    DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")"); 
  
    Row[] rowArr = partitionFrame.collect(); 
    for(Row row : rowArr) { 
        String[] splitArr = row.getString(0).split("/"); 
        String server = splitArr[0].split("=")[1]; 
        String date =  splitArr[1].split("=")[1]; 
        String csvPath = "hdfs:///user/db/ext/"+server+".csv"; 
        if(fs.exists(new Path(csvPath))) { 
            hiveContext.sql("ADD FILE " + csvPath); 
        } 
        createInsertIntoTableABC(hc,entity, …
Run Code Online (Sandbox Code Playgroud)

memory executors hadoop-yarn apache-spark apache-spark-sql

16
推荐指数
1
解决办法
2万
查看次数

什么是在Spark SQL中连接大表的优化方法

我需要使用Spark SQL或Dataframe API连接表.需要知道实现它的优化方式.

场景是:

  1. 所有数据都以ORC格式存在于Hive中(基本数据帧和参考文件).
  2. 我需要加入一个从Hive读取的基本文件(Dataframe)和11-13其他参考文件来创建一个大的内存结构(400列)(大小约1 TB)

实现这一目标的最佳方法是什么?如果有人遇到类似的问题,请分享您的经验.

apache-spark apache-spark-sql

10
推荐指数
3
解决办法
1万
查看次数

广播哈希联接-迭代

当我们有一个足够小以适合内存的数据帧时,我们在Spark中使用广播哈希联接。当小数据框的大小小于以下时,spark.sql.autoBroadcastJoinThreshold 我对此几乎没有疑问。

我们提示广播的小数据帧的生命周期是多少?它会在内存中保留多长时间?我们如何控制它?

例如,如果我使用广播哈希连接将大型数据框与小型数据框连接了两次。第一次执行联接时,它将把小数据帧广播到工作节点并执行联接,同时避免大数据帧数据的混排。

我的问题是,执行者将保留广播数据帧的副本多长时间?它会保留在内存中直到会话结束吗?否则,一旦我们采取任何措施,它将被清除。我们可以控制还是清除它?或者我只是在错误的方向上思考...

apache-spark apache-spark-sql pyspark

8
推荐指数
1
解决办法
755
查看次数

高效的pyspark加入

我已经阅读了很多有关如何在pyspark中进行有效联接的内容。我发现实现高效联接的方法基本上是:

  • 如果可以,请使用广播加入。(我通常不能,因为数据帧太大)
  • 考虑使用非常大的群集。(我宁愿不是因为$$$)。
  • 使用相同的分区程序

最后一个是我想尝试的,但是我找不到在pyspark中实现它的方法。我试过了:

df.repartition(numberOfPartitions,['parition_col1','partition_col2'])
Run Code Online (Sandbox Code Playgroud)

但这无济于事,直到我停止它仍需要花费很长时间,因为在最后的几项工作中卡住了火花。

因此,如何在pyspark中使用相同的分区程序并加快连接速度,甚至摆脱永远需要的时间?我需要使用哪个代码?

PD:即使在stackoverflow上,我也查看了其他文章,但是我仍然看不到代码。

apache-spark pyspark

3
推荐指数
1
解决办法
1419
查看次数