Spark会话状态

rak*_*esh 7 python apache-spark pyspark

如何在我的 py Spark 代码中检查 SparkSession 状态?要求是检查sparkSession是否处于活动状态。如果sparksession未激活,创建另一个spark会话并调用一些函数

我正在 Jupyter 笔记本中编写并运行此代码。

spark = SparkSession.builder.master("yarn") \                                 
    .config("spark.dynamicAllocation.enabled", "true") \
                                    .config("spark.serializer", 
    "org.apache.spark.serializer.KryoSerializer")
    .config("spark.shuffle.spill.compress", "true")
    .config("spark.shuffle.service.enabled", "true")
    .config("spark.io.compression.codec", "snappy")
    .config("spark.kryoserializer.buffer.max", "250m") \
    .config("spark.driver.memory", memory) \
    .config("spark.driver.cores", cores) \
    .config("spark.executor.cores", cores) \
    .config("spark.executor.memory", memory) \
    .config("spark.executor.instances", cores) \
    .enableHiveSupport()\
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)
  1. Spark 打印 SparkSession 详细信息

3.

if(spark):
    print("yes")

else:
    print("no")
Run Code Online (Sandbox Code Playgroud)

打印“是”

  1. spark.stop()

它停止了 Spark 应用程序——我签入了 UI

但是当我再次在第三步运行代码时

5.

if(spark):
    print("yes")

else:
    print("no")
Run Code Online (Sandbox Code Playgroud)

打印“yes”作为输出

  1. 但它确实会产生火花
error : AttributeError: 'NoneType' object has no attribute 'sc'
Run Code Online (Sandbox Code Playgroud)
  1. 但是当我运行下一个命令时我看到了奇怪的事情
df = spark.read.csv(file_name) 
Run Code Online (Sandbox Code Playgroud)

它创建了另一个应用程序并开始执行代码。

我想了解 SparkSession 是否被杀死

观察:a. if(spark) 给出 TRUE,因为它正在修剪其下方的驱动器。b. 当我只写“spark”时——给了我错误c。Spark.read.csv ---- 没有给出任何错误并启动了一个新的应用程序,但在一段时间后抛出了错误 - “无法在停止的 SparkContext 上调用方法。”

要求是检查我的代码/应用程序运行时 SparkSession 是否停止或失败。它应该自动重新启动

我本来想写

def func1:
    create spark session  
    code to execute 


def func2:
    while spark is active :
       time.sleep(200)
    if !spark is active:
        func1()


func1()

func2()
Run Code Online (Sandbox Code Playgroud)

小智 1

您需要自己管理此功能吗?正如方法名称所示,SparkSession.builder.getOrCreate()它的行为方式如下:

https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.SparkSession.builder.getOrCreate.html

获取一个现有的 SparkSession,或者,如果不存在,则根据此构建器中设置的选项创建一个新的 SparkSession。

如果您不需要自己管理,那么我建议您仅使用此方法并省略多余的代码。

如果您确实需要,那么我相信您的问题源于这里:

if (spark):
    print("yes")
Run Code Online (Sandbox Code Playgroud)

这是检查变量的“真实性” spark这是一篇讨论这个问题的好帖子。 简而言之:if (spark)正在检查是否spark未定义(设置为None)。 spark.stop()只是停止 SparkSession,它不会取消分配变量。

试试这个:

if (spark.getActiveSession()):
    print('yes')
else:
    print('no')
Run Code Online (Sandbox Code Playgroud)