我的集群:1个主服务器,11个从服务器,每个节点有6 GB内存.
我的设置:
spark.executor.memory=4g, Dspark.akka.frameSize=512
Run Code Online (Sandbox Code Playgroud)
这是问题所在:
首先,我从HDFS到RDD读取了一些数据(2.19 GB):
val imageBundleRDD = sc.newAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)
其次,在这个RDD上做点什么:
val res = imageBundleRDD.map(data => {
val desPoints = threeDReconstruction(data._2, bg)
(data._1, desPoints)
})
Run Code Online (Sandbox Code Playgroud)
最后,输出到HDFS:
res.saveAsNewAPIHadoopFile(...)
Run Code Online (Sandbox Code Playgroud)
当我运行我的程序时,它显示:
.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO …Run Code Online (Sandbox Code Playgroud) 当减少可以使用的分区数量时coalesce,这很好,因为它不会导致混乱并且似乎立即工作(不需要额外的工作阶段).
我有时会反其道而行之,但会repartition引发一场洗牌.我认为,在几个月前,我居然通过了这方面的工作CoalescedRDD有balanceSlack = 1.0-所以会发生什么是它将把一个分区,这样生成的磁盘分区的位置,所有的同一节点(这么小的净IO)上.
这种功能在Hadoop中是自动的,只需调整分割大小即可.除非减少分区数量,否则它似乎不会在Spark中以这种方式工作.我认为解决方案可能是编写一个自定义分区器以及我们定义的自定义RDD getPreferredLocations......但我认为这样做是如此简单和常见,确实必须有一个直接的方法来做到这一点?
事情尝试:
.set("spark.default.parallelism", partitions)在我的SparkConf,并且在阅读镶木地板的情况下,我已经尝试过sqlContext.sql("set spark.sql.shuffle.partitions= ...,在1.0.0上导致错误并且不是我想要的,我希望分区号在所有类型的工作中改变,而不仅仅是洗牌.
在Spark SQL中加入大数据帧时,我得到的错误与丢失输出位置的shuffle相同.建议设置MEMORY_AND_DISK和/或spark.shuffle.memoryFraction 0.但是,在Spark> = 1.6.0中不推荐使用spark.shuffle.memoryFraction,如果我没有缓存任何RDD或Dataframe,设置MEMORY_AND_DISK应该没有帮助,对吧?此外,我正在获得许多其他WARN日志和任务重试,这使我认为工作不稳定.
因此,我的问题是:
更具体的问题是:
到目前为止,我正在使用这个答案和本章作为起点.还有一些与此主题相关的stackoverflow页面.然而,我还没有找到这个热门问题的全面答案.
提前致谢.
performance join apache-spark apache-spark-sql spark-dataframe
我正在写一个从DataFrame到S3的镶木地板文件.当我查看Spark UI时,我可以看到所有任务,但1完成了写作阶段(例如199/200).最后一项任务似乎需要永远完成,并且通常由于超出执行程序内存限制而失败.
我想知道最后一项任务发生了什么.如何优化呢?谢谢.
我使用Spark sql数据帧执行groupby操作,然后计算每个组的数据的平均值和中位数.原始数据量约为1 TB.
val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(
count("Error").as("Count"),
avg("Error").as("MeanError"),
callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"),
callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"),
callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).
filter($"Count" > 1000)
df_result.orderBy(asc("MeanError")).limit(5000)
.write.format("csv").option("header", "true").save("/user/foo.bar/result.csv")
Run Code Online (Sandbox Code Playgroud)
当我运行该查询时,我的工作陷入困境并且无法完成.我该如何调试问题?是否存在导致groupby()卡住的关键不平衡?