Spark Streaming中的序列化问题

liz*_*isk 9 apache-spark spark-streaming apache-spark-sql apache-spark-ml

我对Spark如何处理引擎盖下的数据感到很困惑.例如,当我运行流作业并应用时foreachRDD,行为取决于是从外部作用域捕获变量还是在内部初始化变量.

val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我得到一个例外:

java.io.NotSerializableException:org.apache.spark.streaming.kafka.DirectKafkaInputDStream的对象$ DirectKafkaInputDStreamCheckpointData可能作为RDD操作关闭的一部分进行序列化.这是因为正在从闭包内引用DStream对象.请在此DStream中重写RDD操作以避免这种情况.这已被强制执行以避免使用不必要的对象使Spark任务膨胀.

但如果我搬进去sparkConf,一切似乎都很好:

dStream.foreachRDD(rdd => {
    val sparkConf = rdd.sparkContext.getConf
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})
Run Code Online (Sandbox Code Playgroud)

这看起来很奇怪,因为我认为它foreachRDD在驱动程序节点上运行,所以我没想到会有什么不同.

现在,如果我同时移动SQL会话和配置foreachRDD,它再次正常工作:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})
Run Code Online (Sandbox Code Playgroud)

Spark文档中的一个片段建议使用以前的版本(其中配置和SQL上下文都在foreachRDD其中创建),这对我来说似乎效率较低:如果只创建一次,为什么要为每个批次创建它们?

有人可以解释为什么抛出异常以及创建SQL上下文的正确方法是什么?

小智 1

ForeachRDD 运行,顾名思义,foreach rdd 在流中运行,为什么你应该在每个 rdd 上重新创建 Spark 上下文?正确的做法是最后一种:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})
Run Code Online (Sandbox Code Playgroud)