为什么Spark尝试发送GetMapOutputStatuses时报告"与MapOutputTracker通信时出错"?

Dan*_*don 16 scala apache-spark-1.3

我正在使用Spark 1.3对大量数据进行聚合.这项工作包括4个步骤:

  1. 读取一个大的(1TB)序列文件(对应1天的数据)
  2. 过滤掉大部分内容并获得大约1GB的随机写入
  3. keyBy客户
  4. aggregateByKey()到为该客户构建配置文件的自定义结构,对应于每个客户的HashMap [Long,Float].Long键是唯一的,永远不会超过50K不同的条目.

我用这个配置运行它:

--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \
Run Code Online (Sandbox Code Playgroud)

并收到此错误:

    org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        ...
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
        ... 21 more
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
Run Code Online (Sandbox Code Playgroud)

这项工作和逻辑已经证明可以使用一个小型测试装置,我甚至可以在某些日期运行这项工作,但对其他日期则不行.我已经用Google搜索并发现"与MapOutputTracker通信时出错"的提示与内部Spark消息有关,但我已经增加了"spark.akka.frameSize","spark.akka.timeout"和"spark.akka.askTimeout"(这最后一个甚至没有出现在Spark文档中,但在Spark邮件列表中提到过,但无济于事.在30秒仍有一些超时,我不知道如何识别或修复.

我认为没有理由因数据大小而失败,因为过滤操作和aggregateByKey执行本地部分聚合这一事实应足以解决数据大小问题.任务数量为16K(从原始输入自动完成),远远超过运行此任务的800个核心,在100个执行器上,因此它不像通常的"增量分区"提示那么简单.任何线索将不胜感激!谢谢!

spa*_*ler 6

我有一个类似的问题,我的工作可以使用较小的数据集正常工作,但是会因较大的数据集而失败.

经过大量配置更改后,我发现更改驱动程序内存设置比更改执行程序内存设置有更大的影响.使用新的垃圾收集器也有很大帮助.我对3个集群使用以下配置,每个集群有40个核心.希望以下配置有帮助:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -  
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g   
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions


spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g

spark.executor.memory=16g
spark.executor.cores=25

spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true

spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m

spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g
Run Code Online (Sandbox Code Playgroud)