一个应用程序可以有多少个SparkSession?

irb*_*ull 5 hadoop-yarn apache-spark apache-spark-sql

我发现当Spark运行时,表的大小(通过Joins)增长,火花执行器最终会耗尽内存并导致整个系统崩溃.即使我尝试将临时结果写入Hive表(在HDFS上),系统仍然没有释放大量内存,并且我的整个系统在大约130个连接后崩溃.

但是,通过实验,我意识到如果我将问题分解成更小的部分,将临时结果写入hive表,并停止/启动Spark会话(和spark上下文),则系统的资源被释放.我能够使用这种方法加入超过1,000列.

但是我找不到任何文档来理解这是否被认为是一种好的做法(我知道你不应该同时获得多个会话).大多数系统在开始时获取会话并在最后关闭它.我也可以将应用程序分解为较小的应用程序,并使用像Oozie这样的驱动程序在Yarn上安排这些较小的应用程序.但是这种方法会在每个阶段启动和停止JVM,这似乎有点重量级.

所以我的问题是:在单个spark应用程序运行期间不断启动/停止spark会话以释放系统资源是不好的做法吗?


但是,您能详细说明单个JVM上单个SparkContext的含义吗?我能够调用sparkSession.sparkContext().stop(),同时也stopSparkSession.然后我创建了一个新的SparkSession并使用了新的sparkContext.没有抛出任何错误.

我也能在JavaSparkPi没有任何问题的情况下使用它.

我已经测试了这个yarn-clientlocal安装了火花.

停止火花环境究竟做了什么,为什么一旦你停止了它就不能创建一个新的?

Jac*_*ski 9

TL; DR您可以SparkSession根据需要拥有尽可能多的s.

您可以SparkContext在一个JVM上只有一个,但SparkSessions 的数量几乎是无限的.

但是,您能详细说明单个JVM上单个SparkContext的含义吗?

这意味着在Spark应用程序的生命周期中的任何给定时间,驱动程序只能是一个且只有一个,这反过来意味着SparkContext该JVM上只有一个可用.

Spark应用程序的驱动程序是SparkContext生命的所在(或者相反,而不是SparkContext定义驱动程序 - 区别非常模糊).

你一次只能有一个SparkContext.虽然你可以根据需要多次启动和停止它,但我记得有一个问题,它说你不应该关闭,SparkContext除非你已经完成了Spark(通常发生在Spark应用程序的最后).

换句话说,SparkContext在Spark应用程序的整个生命周期中都有一个.

有一个类似的问题SparkSession.sql与Dataset.sqlContext.sql有什么区别?关于多个SparkSessions,可以更清楚地说明为什么你想要有两个或更多的会话.

我能够调用sparkSession.sparkContext().stop(),同时也stopSparkSession.

所以?!这与我说的如何相矛盾?!您停止SparkContext了JVM上唯一可用的.没有大碍.你可以,但这只是"你只能SparkContext在一个JVM上只有一个而且只有一个"的一部分,不是吗?

SparkSession仅仅是SparkContext在Spark Core的RDD之上提供Spark SQL的结构化/ SQL功能的包装器.

从Spark SQL开发人员的角度来看,a的目的SparkSession是成为查询实体的命名空间,例如查询使用的表,视图或函数(如DataFrames,Datasets或SQL)和Spark属性(每个可能有不同的值SparkSession).

如果您希望为不同的数据集使用相同的(临时)表名,那么创建两个SparkSessions将是我认为推荐的方式.

我刚刚开发了一个示例来展示整个阶段的codegen如何在Spark SQL中工作,并创建了以下功能,只需关闭该功能即可.

// both where and select operators support whole-stage codegen
// the plan tree (with the operators and expressions) meets the requirements
// That's why the plan has WholeStageCodegenExec inserted
// You can see stars (*) in the output of explain
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)
scala> q.explain
== Physical Plan ==
*Project [_2#89 AS c0#93]
+- *Filter (_1#88 = 0)
   +- LocalTableScan [_1#88, _2#89, _3#90]

// Let's break the requirement of having up to spark.sql.codegen.maxFields
// I'm creating a brand new SparkSession with one property changed
val newSpark = spark.newSession()
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_MAX_NUM_FIELDS
newSpark.sessionState.conf.setConf(WHOLESTAGE_MAX_NUM_FIELDS, 2)

scala> println(newSpark.sessionState.conf.wholeStageMaxNumFields)
2

// Let's see what's the initial value is
// Note that I use spark value (not newSpark)
scala> println(spark.sessionState.conf.wholeStageMaxNumFields)
100

import newSpark.implicits._
// the same query as above but created in SparkSession with WHOLESTAGE_MAX_NUM_FIELDS as 2
val q = Seq((1,2,3)).toDF("id", "c0", "c1").where('id === 0).select('c0)

// Note that there are no stars in the output of explain
// No WholeStageCodegenExec operator in the plan => whole-stage codegen disabled
scala> q.explain
== Physical Plan ==
Project [_2#122 AS c0#126]
+- Filter (_1#121 = 0)
   +- LocalTableScan [_1#121, _2#122, _3#123]
Run Code Online (Sandbox Code Playgroud)

然后我创建了一个新的SparkSession并使用了新的SparkContext.没有抛出任何错误.

再次,这与我所说的单一SparkContext可用内容有何矛盾?我很好奇.

停止火花环境究竟做了什么,为什么一旦你停止了它就不能创建一个新的?

您不能再使用它来运行Spark作业(处理大型和分布式数据集),这正是您首先使用Spark的原因,不是吗?

请尝试以下方法:

  1. 停止 SparkContext
  2. 使用Spark Core的RDD或Spark SQL的数据集API执行任何处理

例外?对!请记住,你关闭了Spark的"大门",那么你怎么能期望进入里面?!:)

  • 你一次只能有一个`SparkContext`,所以你可以根据需要启动和停止它,但我记得你不应该关闭`SparkContext`,除非你完成了Spark(通常发生在Spark应用程序的最后) ). (2认同)