Apache Spark:核心数与执行者数量

zeo*_*dtr 178 hadoop hadoop-yarn apache-spark

我试图了解在YARN上运行Spark作业时内核数量和执行程序数量之间的关系.

测试环境如下:

  • 数据节点数:3
  • 数据节点机器规格:
    • CPU:Core i7-4790(核心数:4,线程数:8)
    • 内存:32GB(8GB x 4)
    • 硬盘:8TB(2TB x 4)
  • 网络:1Gb

  • Spark版本:1.0.0

  • Hadoop版本:2.4.0(Hortonworks HDP 2.1)

  • Spark作业流程:sc.textFile - > filter - > map - > filter - > mapToPair - > reduceByKey - > map - > saveAsTextFile

  • 输入数据

    • 类型:单个文本文件
    • 尺寸:165GB
    • 行数:454,568,833
  • 产量

    • 第二次过滤后的行数:310,640,717
    • 结果文件的行数:99,848,268
    • 结果文件的大小:41GB

该作业使用以下配置运行:

  1. --master yarn-client --executor-memory 19G --executor-cores 7 --num-executors 3 (每个数据节点的执行程序,使用尽可能多的核心)

  2. --master yarn-client --executor-memory 19G --executor-cores 4 --num-executors 3 (核心数量减少)

  3. --master yarn-client --executor-memory 4G --executor-cores 2 --num-executors 12 (少核心,更多执行者)

经过的时间:

  1. 50分15秒

  2. 55分48秒

  3. 31分23秒

令我惊讶的是,(3)更快.
我认为(1)会更快,因为在改组时会有更少的执行者间通信.
虽然(1)的核心数小于(3),但核心数量不是关键因素,因为2)确实表现良好.

(在pwilmot回答之后添加了以下内容.)

有关信息,性能监视器屏幕截图如下:

  • (1)的Ganglia数据节点摘要 - 作业于04:37开始.

Ganglia数据节点摘要(1)

  • (3)的Ganglia数据节点摘要 - 工作于19:47开始.请在此之前忽略图表.

(3)的Ganglia数据节点摘要

该图大致分为两部分:

  • 第一:从开始到reduceByKey:CPU密集型,没有网络活动
  • 第二:在reduceByKey之后:CPU降低,网络I/O完成.

如图所示,(1)可以使用尽可能多的CPU功率.因此,它可能不是线程数量的问题.

如何解释这个结果?

小智 55

为了使所有这一切更加具体,这里有一个配置Spark应用程序以尽可能多地使用集群的工作示例:想象一个集群有六个运行NodeManagers的节点,每个节点配备16个内核和64GB内存.NodeManager容量,yarn.nodemanager.resource.memory-mb和yarn.nodemanager.resource.cpu-vcores应该分别设置为63*1024 = 64512(兆字节)和15.我们避免将100%的资源分配给YARN容器,因为该节点需要一些资源来运行OS和Hadoop守护进程.在这种情况下,我们为这些系统进程留下了一个千兆字节和一个核心.Cloudera Manager通过计算这些并自动配置这些YARN属性来帮助您.

可能的第一个冲动是使用--num-executors 6 --executor-cores 15 --executor-memory 63G.但是,这是错误的方法,因为:

63GB +执行程序内存开销不适合NodeManager的63GB容量.应用程序主机将占用其中一个节点上的核心,这意味着该节点上没有15核心执行程序的空间.每个执行程序15个核心可能导致错误的HDFS I/O吞吐量.

更好的选择是使用--num-executors 17 --executor-cores 5 --executor-memory 19G.为什么?

这个配置在所有节点上产生三个执行器,除了带有AM的节点,它将有两个执行器.--executor-memory派生为(每个节点63/3执行程序)= 21. 21*0.07 = 1.47.21 - 1.47~19.

cloudera的博客http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/上的一篇文章给出了解释.

  • “这个配置会在所有节点上产生三个执行程序,除了一个带有 AM 的执行程序,它将有两个执行程序。”。这对于“--executor-cores 5”意味着什么? (2认同)
  • 这意味着每个执行器使用 5 个核心。每个节点有 3 个执行器,因此使用 15 个核心,但其中一个节点还将运行该作业的应用程序主机,因此只能托管 2 个执行器,即 10 个核心用作执行器。 (2认同)
  • 很好地解释了 - 请注意,这适用于禁用“yarn.scheduler.capacity.resource-calculator”,这是默认设置。这是因为默认情况下它按内存而不是 CPU 进行调度。 (2认同)
  • 更多的执行程序会导致 HDFS I/O 吞吐量不佳。因此,如果我根本不使用 HDFS,在这种情况下,我可以为每个执行程序使用超过 5 个内核吗? (2认同)
  • 我认为应用程序主机在每个节点上运行。根据上述情况,这意味着只有 1 个应用程序主机来运行该作业。那是对的吗? (2认同)

小智 15

Sandy Ryza表示,当你在HDFS上运行你的火花应用程序时

我注意到HDFS客户端遇到大量并发线程的问题.粗略的猜测是,每个执行程序最多可以实现5个任务的完全写入吞吐量,因此最好将每个执行程序的核心数保持在该数量之下.

所以我认为你的第一个配置比第三个慢,因为HDFS I/O吞吐量很差


小智 11

我自己没有玩过这些设置,所以这只是推测,但如果我们将此问题视为分布式系统中的普通内核和线程,那么在您的群集中,您最多可以使用12个内核(4*3台机器)和24个线程(8*3台机器).在前两个示例中,您将为您的工作提供相当数量的核心(可能的计算空间),但在这些核心上运行的线程(作业)数量非常有限,以至于您无法使用分配的大部分处理能力因此,即使分配了更多的计算资源,作业也会变慢.

你提到你的问题是在洗牌阶段 - 尽管限制洗牌步骤的开销是很好的,但通常更重要的是利用集群的并行化.考虑一下极端情况 - 单线程程序,零洗牌.

  • @zeodtr pwilmot 是正确的 - 您至少需要 2-4 个任务才能充分利用核心的潜力。这么说吧 - 我通常为我的 80 核心集群使用至少 1000 个分区。 (2认同)

tur*_*nvh 7

简短答案:我认为tgbaggio是正确的。您在执行程序上达到HDFS吞吐量限制。

我认为这里的答案可能比这里的一些建议简单一些。

对我来说,线索是在群集网络图中。对于运行1,利用率稳定在〜50 M字节/秒。对于运行3,稳定利用率提高了一倍,约为100 M字节/秒。

该Cloudera的博客文章通过共享DzOrd,你可以看到这个重要的报价:

我注意到HDFS客户端遇到大量并发线程的麻烦。粗略猜测是,每个执行者最多可以完成五个任务,以实现完整的写入吞吐量,因此最好将每个执行者的内核数保持在该数量以下。

因此,让我们做一些计算,看看如果这是真的,我们期望得到什么性能。


运行1:19 GB,7核,3个执行程序

  • 3个执行者x 7个线程= 21个线程
  • 每个执行者具有7个内核,我们希望对HDFS的IO有限(最多5个内核)
  • 有效吞吐量〜= 3个执行程序x 5个线程= 15个线程

运行3:4 GB,2个内核,12个执行程序

  • 2个执行程序x 12个线程= 24个线程
  • 每个执行程序2个内核,因此hdfs吞吐量还可以
  • 有效吞吐量〜= 12个执行程序x 2个线程= 24个线程

如果作业是100%受并发(线程数)限制的。我们希望运行时与线程数完全成反比。

ratio_num_threads = nthread_job1 / nthread_job3 = 15/24 = 0.625
inv_ratio_runtime = 1/(duration_job1 / duration_job3) = 1/(50/31) = 31/50 = 0.62
Run Code Online (Sandbox Code Playgroud)

因此ratio_num_threads ~= inv_ratio_runtime,看来我们受网络限制。

相同的效果说明了运行1和运行2之间的区别。


运行2:19 GB,4核,3个执行程序

  • 3个执行者x 4个线程= 12个线程
  • 每个执行器具有4个内核,可以将IO转换为HDFS
  • 有效吞吐量〜= 3个执行程序x 4个线程= 12个线程

比较有效线程数和运行时:

ratio_num_threads = nthread_job2 / nthread_job1 = 12/15 = 0.8
inv_ratio_runtime = 1/(duration_job2 / duration_job1) = 1/(55/50) = 50/55 = 0.91
Run Code Online (Sandbox Code Playgroud)

它不像上次比较那样完美,但是当丢失线程时,我们仍然会看到类似的性能下降。

现在来看最后一点:为什么会出现这样的情况,尤其是在使用更多线程的情况下我们可以获得更好的性能。比CPU数量更多的线程?

并行性(我们所得到的瓜分到多个CPU数据)和并发性(我们得到什么,当我们使用多线程做单CPU的工作),在这个伟大的职位由罗布·派克提供之差的一个很好的解释:并发不是并行性

简短的解释是,如果Spark作业正在与文件系统或网络进行交互,则CPU将花费大量时间等待与这些接口的通信,而实际上并没有花费大量时间来“工作”。通过给这些CPU一次执行多个任务,他们将花费更少的等待时间和更多的工作时间,并且您会看到更好的性能。

  • 有趣且令人信服的解释,我想知道您是否如何猜测执行器有 **5** 任务限制以实现最大吞吐量。 (2认同)

d8a*_*nja 5

RStudio的Sparklyr软件包页面上可用的优秀资源中:

火花定义

为Spark术语提供一些简单的定义可能会很有用:

节点:服务器

工作者节点:属于集群的服务器,可用于运行Spark作业

主节点:协调工作节点的服务器。

执行程序:节点内的一种虚拟机。一个节点可以有多个执行程序。

驱动程序节点:启动Spark会话的节点。通常,这将是sparklyr所在的服务器。

驱动程序(执行程序):驱动程序节点也将显示在执行程序列表中。