Spark:以编程方式获取集群核心数

Rou*_*her 7 java core dataset hadoop-yarn apache-spark

我在纱线集群中运行我的火花应用程序.在我的代码中,我使用数量可用的队列核心来创建我的数据集上的分区:

Dataset ds = ...
ds.coalesce(config.getNumberOfCores());
Run Code Online (Sandbox Code Playgroud)

我的问题:如何通过编程方式而不是配置来获取队列的可用数量?

Sim*_*Sim 13

有一些方法可以从Spark中获取集群中的执行程序数和核心数.这是我过去使用过的一些Scala实用程序代码.您应该能够轻松地将其适应Java.有两个关键的想法:

  1. 工人的数量是执行者的数量减去一个或sc.getExecutorStorageStatus.length - 1.

  2. 可以通过java.lang.Runtime.getRuntime.availableProcessors在worker上执行来获得每个worker的核心数.

其余代码是样板,用于添加SparkContext使用Scala 含义的便捷方法.我写了1.x年前的代码,这就是它没有使用的原因SparkSession.

最后一点:合并到多个核心通常是一个好主意,因为这可以在数据偏斜的情况下提高性能.实际上,我使用1.5x到4x之间的任何位置,具体取决于数据的大小以及作业是否在共享集群上运行.

import org.apache.spark.SparkContext

import scala.language.implicitConversions


class RichSparkContext(val sc: SparkContext) {

  def executorCount: Int =
    sc.getExecutorStorageStatus.length - 1 // one is the driver

  def coresPerExecutor: Int =
    RichSparkContext.coresPerExecutor(sc)

  def coreCount: Int =
    executorCount * coresPerExecutor

  def coreCount(coresPerExecutor: Int): Int =
    executorCount * coresPerExecutor

}


object RichSparkContext {

  trait Enrichment {
    implicit def enrichMetadata(sc: SparkContext): RichSparkContext =
      new RichSparkContext(sc)
  }

  object implicits extends Enrichment

  private var _coresPerExecutor: Int = 0

  def coresPerExecutor(sc: SparkContext): Int =
    synchronized {
      if (_coresPerExecutor == 0)
        sc.range(0, 1).map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
      else _coresPerExecutor
    }

}
Run Code Online (Sandbox Code Playgroud)

  • 有时执行器内核会过度配置或配置不足,这意味着 JVM 运行时功能可能不准确。 (2认同)
  • 仅供参考,从 Spark 2.4.4 开始,getExecutorStorageStatus 不再可用 (2认同)

Ste*_*e C 8

在寻找几乎相同问题的答案时发现了这一点。

我找到:

Dataset ds = ...
ds.coalesce(sc.defaultParallelism());
Run Code Online (Sandbox Code Playgroud)

正是OP正在寻找的东西。

例如,我的 5 节点 x 8 核心集群为defaultParallelism.