Spark 2.4 到 Elasticsearch:防止 Dataproc 节点停用期间数据丢失?

Fre*_*ier 3 elasticsearch apache-spark elasticsearch-hadoop google-cloud-dataproc

我的技术任务是将数据从GCS(Google Cloud Storage)同步到我们的Elasticsearch集群。

我们在 Google Dataproc 集群(启用自动扩展)上使用 Apache Spark 2.4 和 Elastic Hadoop 连接器。

在执行过程中,如果 Dataproc 集群缩小规模,停用节点上的所有任务都会丢失,并且该节点上处理的数据永远不会推送到弹性。

例如,当我保存到 GCS 或 HDFS 时,就不存在此问题。

即使节点退役,如何使这项任务具有弹性?

堆栈跟踪的摘录:

阶段 2.3 中丢失任务 50.0 (TID 427, xxxxxxx-sw-vrb7.c.xxxxxxx, 执行器 43): FetchFailed(BlockManagerId(30, xxxxxxx-w-23.c.xxxxxxx, 7337, None), shuffleId=0, mapId =26,reduceId=170,message=org.apache.spark.shuffle.FetchFailedException:无法连接到 xxxxxxx-w-23.c.xxxxxxx:7337

引起原因:java.net.UnknownHostException:xxxxxxx-w-23.c.xxxxxxx

阶段 2.3 中的任务 50.0 (TID 427) 失败,但该任务不会重新执行(要么是因为该任务因 shuffle 数据获取失败而失败,所以需要重新运行前一个阶段,要么是因为不同的副本任务已经成功)。

谢谢。弗雷德

小智 5

我将介绍一些有关“缩小规模问题”的背景知识以及如何缓解它。请注意,此信息适用于手动缩减规模以及抢占式虚拟机被抢占。

背景

自动缩放根据集群中“可用”YARN 内存量删除节点。它不考虑集群上的随机数据。这是我们最近提供的演示中的一个插图。

在 MapReduce 风格的作业中(Spark 作业是阶段之间的一组 MapReduce 风格的 shuffle),来自所有映射器的数据必须到达所有减速器。Mappers将其shuffle数据写入本地磁盘,然后reducer从每个mapper获取数据。每个节点上都有一个服务器专门用于提供 shuffle 数据,并且它在 YARN 外部运行。因此,节点在 YARN 中可能会显示为空闲,即使它需要留下来提供其 shuffle 数据。

正常洗牌

当单个节点被删除时,几乎所有减速器都会失败,因为它们都需要从每个节点获取数据。减速器将特别失败FetchFailedException(如您所见),表明它们无法从特定节点获取随机数据。驱动程序最终将重新运行必要的映射器,然后重新运行reduce阶段。Spark 效率有点低(https://issues.apache.org/jira/browse/SPARK-20178),但它确实有效。

当节点被删除时进行随机播放

请注意,您可能会在以下三种情况之一中丢失节点:

  1. 故意删除节点(自动缩放或手动缩小)
  2. 可抢占虚拟机 被抢占。可抢占式虚拟机至少每 24 小时被抢占一次。
  3. (相对罕见)标准 GCE VM 被 GCE 非正常终止并重新启动。通常,标准虚拟机是透明实时迁移的

当您创建自动扩缩集群时,Dataproc会添加多个属性来提高作业在节点丢失时的恢复能力:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10
spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h
Run Code Online (Sandbox Code Playgroud)

请注意,如果您在现有集群上启用自动缩放,则不会设置这些属性。(但是您可以在创建集群时手动设置它们)。

缓解措施

1) 使用优雅退役

Dataproc 与YARN 的 Graceful Decommissioning集成,并且可以设置自动扩展策略或手动缩减操作。

当优雅地停用节点时,YARN 会保留该节点,直到该节点上运行容器的应用程序完成,但不会让它运行新容器。这使得节点有机会在被删除之前提供其随机数据。

您需要确保您的优雅退役超时足够长,以涵盖您最长的工作。自动缩放文档建议1h作为起点。

请注意,只有处理大量短期作业的长时间运行的集群才真正有意义。

对于临时集群,您最好从一开始就“调整大小”集群,或者禁用缩减规模,除非集群完全空闲(设置scaleDownMinWorkerFraction=1.0)。

2) 避免抢占式虚拟机

即使使用优雅退役,可抢占虚拟机也会通过“抢占”定期终止。GCE保证可抢占的虚拟机将在24小时内被抢占,并且大型集群上的抢占非常分散。

如果您使用正常停用,并且FetchFailedException错误消息包括-sw-,您可能会看到由于节点被抢占而导致获取失败。

您有两个选项可以避免使用抢占式虚拟机: 1. 在自动扩展策略中,您可以将secondaryWorkerConfig最小和最大实例设置为 0,并将所有工作线程放入主要组中。2. 或者,您可以继续使用“辅助”工作人员,但设置--properties dataproc:secondary-workers.is-preemptible.override=false。这将使您的辅助工作人员成为标准虚拟机。

3)长期:增强灵活性模式

Dataproc 的增强灵活性模式是解决随机播放问题的长期解决方案。

缩减问题是由存储在本地磁盘上的随机数据引起的。EFM 将包括新的shuffle 实现,允许将 shuffle 数据放置在一组固定的节点(例如,仅主工作节点)或集群外部的存储上。

这将使二级工作者成为无国籍人,这意味着他们可以随时被移除。这使得自动缩放变得更加引人注目。

目前,EFM 仍处于 Alpha 阶段,无法扩展到实际工作负载,但预计会在夏季推出可投入生产的 Beta 版。