Wil*_*ter 5 scala dynamic executors apache-spark
我正在 Spark (v2+) 中寻找一种可靠的方法来以编程方式调整会话中执行器的数量。
我了解动态分配以及在创建会话时配置 Spark 执行器的能力(例如使用--num-executors
),但是由于我的 Spark 作业的性质,这些选项对我来说都不是很有用。
该作业对大量数据执行以下步骤:
我很感激我可以将此作业分成两个作业,分别使用不同的 Spark 资源配置文件执行,但我真正想要的是在 Spark 脚本中的特定点(在 Elasticsearch 加载开始之前)以编程方式将执行器的数量设置为 X )。这似乎是一件普遍可以做的有用的事情。
我尝试了一些更改设置,发现了一些可行的方法,但感觉像是一种老套的方式来做一些应该可以以更标准化和受支持的方式完成的事情。
我的尝试(这只是我的尝试):
def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect {
case bm if !bm.isDriver => bm
}
def reduceExecutors(totalNumber: Int): Unit = {
//TODO throw error if totalNumber is more than current
logger.info(s"""Attempting to reduce number of executors to $totalNumber""")
spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
val killedExecutors = scala.collection.mutable.ListBuffer[String]()
while (getExecutors.size > totalNumber) {
val executorIds = getExecutors.map(_.executorId).filterNot(killedExecutors.contains(_))
val executorsToKill = Random.shuffle(executorIds).take(executorIds.size - totalNumber)
spark.sparkContext.killExecutors(executorsToKill)
killedExecutors ++= executorsToKill
Thread.sleep(1000)
}
}
def increaseExecutors(totalNumber: Int): Unit = {
//TODO throw error if totalNumber is less than current
logger.info(s"""Attempting to increase number of executors to $totalNumber""")
spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
while (getExecutors.size < totalNumber) {
Thread.sleep(1000)
}
}
Run Code Online (Sandbox Code Playgroud)
您可以尝试的一件事是致电
val dfForES = df.coalesce(numberOfParallelElasticSearchUploads)
Run Code Online (Sandbox Code Playgroud)
在步骤 #2 之前。这将减少分区数量,而不会产生混洗开销,并确保只有 max numberOfParallelElasticSearchUploads 执行程序并行向 ES 发送数据,而其余执行程序则处于空闲状态。
如果您在共享集群上运行作业,我仍然建议启用动态分配来释放这些空闲执行器,以获得更好的资源利用率。