应用程序内的 Spark 调度:性能问题

Klu*_*lun 5 scala apache-spark spark-streaming apache-spark-sql databricks

我像这样实现 Apache Spark Scheduling Within (Scala 代码):

\n\n
// group into list of 10 items...\nval maxSimultaneousSubmitAndMonitorThreadsInDriver = 10\n\n// ... in order to throttle the number of threads submitting and monitoring apps at a time\nval lists = myList grouped maxSimultaneousThreadsInDriver \n\nfor (aList <- lists) {\n\n   // pick a list, then convert it to Scala Parallel list\n   aList.par.foreach { // so 10 threads MAX at a time, that can handle job submission and monitoring\n      case (file_name) => {\n\n        // in each driver thread, create different Spark session\n        val sparkChild = sparkMain.newSession()\n\n        // then do specific stuff with such session\n        val childDF = sparkChild.read.parquet( filename + "_directory/*.parquet") \n        ...\n     }\n   }\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

正如您所知,通过调度的概念,在这样的单个驱动程序实例中可以监视多个 Spark 应用程序。这样我就可以同时运行多个 Spark 应用程序。(在我的例子中,每个 Spark 应用程序都可以根据名称、根据业务规则执行每个文件读取的非常具体的任务)。

\n\n

调度程序默认配置为 FIFO 模式:

\n\n
\n

默认情况下,Spark\xe2\x80\x99s 调度程序以 FIFO 方式运行作业。每个作业分为\xe2\x80\x9cstages\xe2\x80\x9d(例如map和reduce阶段),第一个作业在所有可用资源上获得优先级,而其阶段有任务要启动,然后第二个作业获得优先级,等等。如果队列头部的作业 don\xe2\x80\x99t 需要使用整个集群,后面的作业可以立即开始运行 [...]

\n
\n\n

这样的解决方案对我有用。然而我发现Spark Scheduling Within有点慢。例如,当我看到Spark UI Executors 选项卡时,我可以看到大多数时候只使用很少的核心。

\n\n

使用 Spark Scheduling Within 模式,集群资源似乎明显未得到充分利用

\n\n

这与我拥有的经典 Spark 应用程序相反,后者几乎总是自然地完全消耗 CPU!

\n\n

所以我的最后一个问题是如何优化Spark Scheduling Within的性能?

\n\n

我尝试过的:

\n\n
    \n
  • 更改maxSimultaneousSubmitAndMonitorThreadsInDriver, 以限制在给定时间提交和监视应用程序的线程数量
  • \n
  • 试图增加spark.scheduler.listenerbus.eventqueue.capacity
  • \n
  • 尝试增加/减少spark.default.parallelism
  • \n
  • 尝试增加/减少spark.sql.shuffle.partitions
  • \n
\n\n

如果我增加可以同时提交和监视 Spark 应用程序的线程数量(使用节流系统),我最终会遇到 OOM。

\n\n

关于spark.default.parallelismspark.sql.shuffle.partitions,我不知道如何选择相关值。如果我不进行调度(每个驱动程序只有一个应用程序),我设置的值可能是 192(核心数量)以获得良好的结果。

\n\n

但对于内部调度来说,还不清楚。每个提交的作业都很小,并且每个作业的并行度 192 似乎有点过大(而且很慢?)。

\n\n

任何投入将不胜感激

\n

Ofe*_*Hod 5

首先,您定义maxSimultaneousSubmitAndMonitorThreadsInDriver=10然后使用maxSimultaneousThreadsInDriver而不是您刚刚声明的,这是有意的吗?

其次,尝试删除该行val sparkChild = sparkMain.newSession()并将下一行更改为val childDF = sparkMain.read.parquet( filename + "_directory/*.parquet"),它可以编译吗?如果确实如此,请再次检查。

您是否尝试增加执行者数量?
如果参数已存在于您的 Spark-Submit to 中--num-executors 20,则添加或更改,如果通过代码创建上下文,则在代码中的行conf.set("spark.executor.instances", 20)之前添加。 现在再运行一下,性能有提升吗?如果是但还不够,则增加到 40。 如果您仍然卡住,请继续阅读。 new SparkContext(conf)

Spark作业默认的运行行为是先进先出(FIFO),即优先考虑第一个作业,只有在第一个作业释放资源后还有可用资源时才执行后面的作业。
我猜你只得到 14 个任务(每个执行器 7 个),因为你的文件非常小,如果任务运行得非常快,那么重新分区不会解决问题,但允许并行作业可以。
由于您正在寻找作业之间的并行性,因此我建议您使用 FAIR 调度程序并为您创建的每个线程/作业分配不同的池。

为您的 Spark 应用程序配置公平共享,方法是添加到您的 Spark-submit 中--conf spark.scheduler.mode=FAIR,如果通过代码创建上下文,请在代码中的行 conf.set("spark.scheduler.mode", FAIR)之前添加。new SparkContext(conf)

在线程内执行任何作业之前分配随机池名称(您可以获取线程 ID,但即使对于同一线程,建议每个作业使用不同的池名称):

val randomString = scala.util.Random.alphanumeric.take(10).mkString("")
sparkMaster.setLocalProperty("spark.scheduler.pool", randomString)
val childDF = sparkMaster.read.parquet( filename + "_directory/*.parquet") 
Run Code Online (Sandbox Code Playgroud)

现在公平共享应该启动并在线程之间平均分配资源。
如果您仍然发现核心使用率较低,请尝试在不以 OOM 为目标的情况下将最大线程池容量最大化。
如果仍然很慢,请考虑重新分区到(max_cores / max_threads),在您的情况下(看到 2 个执行程序,有 192 个可用核心,即总共 384 个384/10=38,因此 repartition(38) 可能会有所帮助。

参考:https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application