为什么Spark选择在单个节点上完成所有工作?

Aub*_*nha 6 hadoop bigdata hadoop-yarn apache-spark

我在执行Spark作业时遇到困难,大约一半的时间,它将选择在单个节点上处理所有数据,然后该节点的内存不足并死亡。

问题:如何确保这种情况不会一次发生?

该系统在Yarn上使用Spark 1.6.0,并从Hadoop 2.6数据存储中提取数据,所有代码均用Java编写。我在具有十二个节点(Amazon)的整个集群中动态分配资源。

DAG比较简单:

RDD --> mapToPair \  
                   coGroup --> flatMapToPair --> reduceByKey --> save
RDD --> mapToPair /
Run Code Online (Sandbox Code Playgroud)

当它正确运行时,所有任务将在整个群集中得到很好的分配,整个工作大约需要20分钟。我们称其为“良好行为”。但是,有时候,flatMapToPair阶段实际上在单个执行程序中运行。我们称此为“不良行为”

当我为一个“不良行为”工作加载Spark UI并进入flatMapToPair阶段时,我发现实际上每个节点上运行着大约3-4个执行程序(与“不良行为”情况相同)。 。但是,除了一个完成之外,几乎所有其他操作都在不到一秒钟的时间内完成,其余的执行程序将运行10分钟,然后由于超出存储限制而被纱线杀死。

我已经尝试过的事情:

  1. 网络。搜索诸如“在一个节点上运行火花”之类的内容,几乎所有版本都会导致人们在Spark Shell或类似配置问题中以本地模式运行。鉴于我至少在某些时候表现良好,这些配置问题似乎不太可能(并且我检查了自己是否不是偶然地处于本地模式下,我有〜100个分区,...)。

  2. 在同一群集上运行的其他Spark作业表现良好。这似乎可以排除集群范围内的一些错误配置(哎呀,即使这项工作有时运行良好)。

  3. 群集利用率似乎并不会影响我是否表现良好。当群集被大量利用时,以及群集根本没有运行时,我都看到了这两种行为。

  4. 这似乎不是一个毛线问题,因为执行者都在整个集群中分布良好。我对此当然可以错了,但实际上问题似乎在于执行者之间的工作分配。

  5. 数据集中有多个键。我在coGroup和flatMapToPair之间插入了countByKey并打印了结果(用于20个左右的人口众多的键)。数据在这些最重要的键之间相当均匀地分布。

我为回应评论而尝试过的事情

  1. 在flatMapToPair调用之前对RDD重新分区,以强制进行500个分区。这只会将不良行为转移到重新分区阶段。

  2. 增加默认的并行性。我确实通过这种方式获得了更多分区,但是不良行为仍然停留在flatMapToPair阶段。

  3. 剥离数据(实际上,我在发布之前做了很多此类工作,但是未能将其包括在原始列表中)。我们只有10 GB的空间,而且我已经在加载所需的最小数据。

这是一个“有趣”的小heisenbug,其不良行为在添加调试日志后消失,​​然后在删除日志后消失,​​直到稍后再出现。我没有主意,因此,如果有人甚至建议了一些建议的诊断步骤,我将不知所措。

小智 1

我遇到了非常相似的事情,虽然我对解决方案并不完全满意,因为我无法完全解释它为什么有效,但它似乎确实有效。就我而言,这是经过洗牌之后的,并且洗牌后的数据的大小相当小。问题在于,后续计算显着增加了数据大小,导致在 1 或 2 个执行器上执行这些计算成为瓶颈。我最好的猜测是,它与涉及数据源的首选位置和目标分区大小的启发式方法有关,可能还与不知道后期阶段发生的扩展相结合。

通过添加 ,我能够获得一致、分布良好的随机播放coalesce(totalCores),其中totalCores定义为spark.executor.instancesx spark.executor.cores。它似乎也适用于更大的倍数totalCores,但就我而言,我不需要更多的并行性。请注意,可能需要根据用例使用repartition而不是使用。coalesce另外,这是在spark 2.2.1上的,供参考。