以编程方式向 Spark 会话添加/删除执行器

Wil*_*ter 5 scala dynamic executors apache-spark

我正在 Spark (v2+) 中寻找一种可靠的方法来以编程方式调整会话中执行器的数量。

我了解动态分配以及在创建会话时配置 Spark 执行器的能力(例如使用--num-executors),但是由于我的 Spark 作业的性质,这些选项对我来说都不是很有用。

我的火花工作

该作业对大量数据执行以下步骤:

  1. 对数据执行一些聚合/检查
  2. 将数据加载到Elasticsearch(ES集群通常比Spark集群小得多)

问题

  • 如果我使用全套可用 Spark 资源,我将很快使 Elasticsearch 过载,甚至可能导致 Elasticsearch 节点崩溃。
  • 如果我使用足够少量的 Spark 执行器,以免压垮 Elasticsearch,则步骤 1 花费的时间会比实际需要的时间长得多(因为它只占可用 Spark 资源的一小部分)

我很感激我可以将此作业分成两个作业,分别使用不同的 Spark 资源配置文件执行,但我真正想要的是在 Spark 脚本中的特定点(在 Elasticsearch 加载开始之前)以编程方式将执行器的数量设置为 X )。这似乎是一件普遍可以做的有用的事情。

我的初步尝试

我尝试了一些更改设置,发现了一些可行方法,但感觉像是一种老套的方式来做一些应该可以以更标准化和受支持的方式完成的事情。

我的尝试(这只是我的尝试):

def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect { 
  case bm if !bm.isDriver => bm
}

def reduceExecutors(totalNumber: Int): Unit = {
  //TODO throw error if totalNumber is more than current
  logger.info(s"""Attempting to reduce number of executors to $totalNumber""")
  spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
  val killedExecutors = scala.collection.mutable.ListBuffer[String]()
  while (getExecutors.size > totalNumber) {
      val executorIds = getExecutors.map(_.executorId).filterNot(killedExecutors.contains(_))
      val executorsToKill =  Random.shuffle(executorIds).take(executorIds.size - totalNumber)
      spark.sparkContext.killExecutors(executorsToKill)
      killedExecutors ++= executorsToKill
      Thread.sleep(1000)
  }
}

def increaseExecutors(totalNumber: Int): Unit = {
  //TODO throw error if totalNumber is less than current
  logger.info(s"""Attempting to increase number of executors to $totalNumber""")
  spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
  while (getExecutors.size < totalNumber) {
      Thread.sleep(1000)
  }
}
Run Code Online (Sandbox Code Playgroud)

Den*_*nko 2

您可以尝试的一件事是致电

val dfForES = df.coalesce(numberOfParallelElasticSearchUploads) 
Run Code Online (Sandbox Code Playgroud)

在步骤 #2 之前。这将减少分区数量,而不会产生混洗开销,并确保只有 max numberOfParallelElasticSearchUploads 执行程序并行向 ES 发送数据,而其余执行程序则处于空闲状态。

如果您在共享集群上运行作业,我仍然建议启用动态分配来释放这些空闲执行器,以获得更好的资源利用率。