相关疑难解决方法(0)

Spark java.lang.OutOfMemoryError:Java堆空间

我的集群:1个主服务器,11个从服务器,每个节点有6 GB内存.

我的设置:

spark.executor.memory=4g, Dspark.akka.frameSize=512
Run Code Online (Sandbox Code Playgroud)

这是问题所在:

首先,我从HDFS到RDD读取了一些数据(2.19 GB):

val imageBundleRDD = sc.newAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)

其次,在这个RDD上做点什么:

val res = imageBundleRDD.map(data => {
                               val desPoints = threeDReconstruction(data._2, bg)
                                 (data._1, desPoints)
                             })
Run Code Online (Sandbox Code Playgroud)

最后,输出到HDFS:

res.saveAsNewAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)

当我运行我的程序时,它显示:

.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO …
Run Code Online (Sandbox Code Playgroud)

out-of-memory apache-spark

208
推荐指数
9
解决办法
21万
查看次数

Spark:增加分区数量而不会导致shuffle?

当减少可以使用的分区数量时coalesce,这很好,因为它不会导致混乱并且似乎立即工作(不需要额外的工作阶段).

我有时会反其道而行之,但会repartition引发一场洗牌.我认为,在几个月前,我居然通过了这方面的工作CoalescedRDDbalanceSlack = 1.0-所以会发生什么是它将把一个分区,这样生成的磁盘分区的位置,所有的同一节点(这么小的净IO)上.

这种功能在Hadoop中是自动的,只需调整分割大小即可.除非减少分区数量,否则它似乎不会在Spark中以这种方式工作.我认为解决方案可能是编写一个自定义分区器以及我们定义的自定义RDD getPreferredLocations......但我认为这样做是如此简单和常见,确实必须有一个直接的方法来做到这一点?

事情尝试:

.set("spark.default.parallelism", partitions)在我的SparkConf,并且在阅读镶木地板的情况下,我已经尝试过sqlContext.sql("set spark.sql.shuffle.partitions= ...,在1.0.0上导致错误并且不是我想要的,我希望分区号在所有类型的工作中改变,而不仅仅是洗牌.

scala apache-spark

19
推荐指数
1
解决办法
1万
查看次数

如何在Spark SQL中加入大数据帧?(最佳实践,稳定性,性能)

在Spark SQL中加入大数据帧时,我得到的错误与丢失输出位置的shuffle相同.建议设置MEMORY_AND_DISK和/或spark.shuffle.memoryFraction 0.但是,在Spark> = 1.6.0中不推荐使用spark.shuffle.memoryFraction,如果我没有缓存任何RDD或Dataframe,设置MEMORY_AND_DISK应该没有帮助,对吧?此外,我正在获得许多其他WARN日志和任务重试,这使我认为工作不稳定.

因此,我的问题是:

  • 在Spark SQL> = 1.6.0中加入大型数据帧的最佳实践是什么?

更具体的问题是:

  • 如何调整执行程序的数量spark.sql.shuffle.partitions以实现更好的稳定性/性能?
  • 如何在并行级别(执行程序/核心数量)和分区数量之间找到适当的平衡点?我发现增加执行程序的数量并不总是解决方案,因为它可能会因网络流量而产生I/O读取超时异常.
  • 是否还有其他相关参数需要针对此目的进行调整?
  • 我的理解是,加入存储为ORCParquet的数据可以提供比文本或Avro更好的连接操作性能.Parquet和ORC之间有显着差异吗?
  • SQLContextHiveContext有关于连接操作的稳定性/性能的优势吗?
  • 当连接中涉及的数据帧先前是registerTempTable()saveAsTable()时,性能/稳定性是否存在差异?

到目前为止,我正在使用这个答案本章作为起点.还有一些与此主题相关的stackoverflow页面.然而,我还没有找到这个热门问题的全面答案.

提前致谢.

performance join apache-spark apache-spark-sql spark-dataframe

11
推荐指数
1
解决办法
4242
查看次数

Spark写Parquet到S3最后一项任务需要永远

我正在写一个从DataFrame到S3的镶木地板文件.当我查看Spark UI时,我可以看到所有任务,但1完成了写作阶段(例如199/200).最后一项任务似乎需要永远完成,并且通常由于超出执行程序内存限制而失败.

我想知道最后一项任务发生了什么.如何优化呢?谢谢.

apache-spark parquet apache-spark-sql

8
推荐指数
2
解决办法
2612
查看次数

Spark数据帧组的平均值和中位数未完成

我使用Spark sql数据帧执行groupby操作,然后计算每个组的数据的平均值和中位数.原始数据量约为1 TB.

val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(
        count("Error").as("Count"), 
        avg("Error").as("MeanError"), 
        callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"), 
        callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"), 
        callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).
    filter($"Count" > 1000)


df_result.orderBy(asc("MeanError")).limit(5000)
    .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv")
Run Code Online (Sandbox Code Playgroud)

当我运行该查询时,我的工作陷入困境并且无法完成.我该如何调试问题?是否存在导致groupby()卡住的关键不平衡?

apache-spark apache-spark-sql spark-dataframe

6
推荐指数
1
解决办法
567
查看次数