Spark集群上"Locality Level"的含义是什么?

fan*_*nhk 43 cluster-computing apache-spark

标题"地点级别"和5状态数据本地 - >进程本地 - >节点本地 - >机架本地 - >任何?的含义是什么?

在此输入图像描述

Dan*_* H. 46

据我所知,位置级别表示已执行哪种类型的数据访问.当节点完成其所有工作并且其CPU变为空闲时,Spark可能决定启动其他需要从其他位置获取数据的待处理任务.理想情况下,所有任务都应该是本地进程,因为它与较低的数据访问延迟相关联.

您可以使用以下命令配置等待时间,然后再转到其他位置级别:

spark.locality.wait
Run Code Online (Sandbox Code Playgroud)

有关参数的更多信息,请参阅Spark配置文档

对于不同层次PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL,或任何我认为方法findTaskfindSpeculativeTaskorg.apache.spark.scheduler.TaskSetManager说明根据自己的本地国家级星火如何选择任务.它首先检查将在同一执行程序进程中启动的PROCESS_LOCAL任务.如果没有,它将检查可能在同一节点中的其他执行程序中的NODE_LOCAL任务,或者需要从HDFS,缓存等系统中检索NACKE_LOCAL任务.RACK_LOCAL表示数据在另一个节点中,因此需要先传输执行.最后,ANY只是承担可能在当前节点中运行的任何挂起任务.

  /**
   * Dequeue a pending task for a given node and return its index and locality level.
   * Only search for tasks matching the given locality constraint.
   */
  private def findTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value)] =
  {
    for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
      for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL))
      }
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- findTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL))
      }
    }

    // Look for no-pref tasks after rack-local tasks since they can run anywhere.
    for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
      return Some((index, TaskLocality.PROCESS_LOCAL))
    }

    if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
      for (index <- findTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY))
      }
    }

    // Finally, if all else has failed, find a speculative task
    findSpeculativeTask(execId, host, locality)
  }
Run Code Online (Sandbox Code Playgroud)


Eug*_*ene 10

这是我的两分钱,我主要从Spark 官方指南中总结出来。

首先,我想再添加一个局部性级别,这NO_PREF已在本线程中讨论过。
然后,让我们将这些级别放在一个表中,

在此输入图像描述

值得注意的是,可以根据Spark 配置 的指南跳过特定级别。

例如,如果你想跳过NODE_LOCAL,只需设置spark.locality.wait.node为0。