如果SparkSession没有关闭会发生什么?

Mar*_*ace 13 scala apache-spark apache-spark-sql

以下2有什么区别?

object Example1 {
    def main(args: Array[String]): Unit = {
        try {
            val spark = SparkSession.builder.getOrCreate
            // spark code here
        } finally {
            spark.close
        }
    }
}

object Example2 {
    val spark = SparkSession.builder.getOrCreate
    def main(args: Array[String]): Unit = {
        // spark code here
    }
}    
Run Code Online (Sandbox Code Playgroud)

我知道SparkSession实现了Closeable,它暗示它需要关闭.但是,如果SparkSession刚刚在Example2中创建并且从不直接关闭,我想不出任何问题.如果Spark应用程序失败(并从main方法退出)成功,JVM将终止,SparkSession将随之消失.它是否正确?IMO:SparkSession是一个单身人士的事实也不应该产生很大的影响.

Jac*_*ski 9

当你完成它的使用时,你应该总是关闭SparkSession它(即使最终的结果只是遵循一个好的做法,回馈你给你的东西).

关闭a SparkSession可能会触发释放可能提供给其他应用程序的群集资源.

SparkSession是一个会话,因此维护一些消耗JVM内存的资源.您可以拥有任意数量的SparkSession(请参阅SparkSession.newSession重新创建会话)但是您不希望它们使用它们不应该使用的内存,如果您不使用它而不是close您不再需要的内存.

SparkSession是Spark SQL围绕Spark Core的SparkContext的包装器,因此在封面下(如在任何Spark应用程序中),您将拥有分配给您SparkSession(通过SparkContext)的群集资源,即vcores和内存.这意味着只要您SparkContext正在使用(使用SparkSession),群集资源就不会分配给其他任务(不一定是Spark,也适用于提交给群集的其他非Spark应用程序).这些集群的资源是你,直到你说"我完成了"翻译为... close.

但是,如果之后close,您只需退出Spark应用程序,则无需考虑执行,close因为无论如何资源都将自动关闭.驱动程序和执行程序的JVM终止,集群的(心跳)连接也终止,因此最终将资源返回给集群管理器,以便它可以提供给其他应用程序使用.

  • 如果我错了,请纠正我,但Spark应用程序可能成功或失败.在任何一种情况下,Spark应用程序都会终止,与之关联的JVM也会终止.一旦JVM终止,所有资源都被释放(无论我是否调用close).另外,如果我使用`getOrCreate()`,SparkSession不是单例吗?你是什​​么意思"你可以拥有任意数量的SparkSessions"?谢谢! (4认同)

yug*_*har 6

两者都一样!

Spark会话的stop/ close最终调用spark上下文的stop

def stop(): Unit = {
  sparkContext.stop()
}

override def close(): Unit = stop()
Run Code Online (Sandbox Code Playgroud)

Spark上下文具有运行时关闭挂钩,以在退出JVM之前关闭 Spark上下文。请在下面找到用于在创建上下文时添加关闭挂钩的火花代码

ShutdownHookManager.addShutdownHook(
  _shutdownHookRef = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
  logInfo("Invoking stop() from shutdown hook")
  stop()
}
Run Code Online (Sandbox Code Playgroud)

因此,无论JVM如何退出,都将调用它。如果您stop()手动进行操作,则此关闭挂钩将被取消以避免重复

def stop(): Unit = {
  if (LiveListenerBus.withinListenerThread.value) {
    throw new SparkException(
      s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")
  }
  // Use the stopping variable to ensure no contention for the stop scenario.
  // Still track the stopped variable for use elsewhere in the code.
  if (!stopped.compareAndSet(false, true)) {
    logInfo("SparkContext already stopped.")
    return
  }
  if (_shutdownHookRef != null) {
    ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
  }
Run Code Online (Sandbox Code Playgroud)