CheckPointing 时 foreachRDD() 中使用的对象的序列化

how*_*ard 3 avro kryo apache-spark spark-streaming rdd

根据这个问题和我读过的文档,Spark Streaming 的 foreachRDD( someFunction ) 只会在驱动程序进程中执行someFunction本身,但如果在 RDD 上完成了操作,那么它们将在执行器上完成 - RDD 所在的位置。

以上所有内容也适用于我,尽管我注意到如果我打开检查点,那么似乎 spark 正在尝试序列化 foreachRDD( someFunction ) 中的所有内容并发送到某个地方 - 这对我造成了问题,因为使用的对象之一不可序列化(即 schemaRegistryClient)。我尝试了 Kryo 序列化程序,但也没有运气。

如果我关闭检查点,序列化问题就会消失。

有没有办法让 Spark 不序列化 foreachRDD( someFunc ) 中使用的内容,同时继续使用检查点?

非常感谢。

Yuv*_*kov 5

有没有办法让 Spark 不序列化 foreachRDD(someFunc) 中使用的内容,同时继续使用检查点?

检查点不应该与您的问题有关。根本问题是您有一个不可序列化的对象实例,需要将其发送给您的工作人员。

当你有这样的依赖时,在 Spark 中有一个通用的模式。您创建一个object带有惰性瞬态属性的对象,该属性将在需要时加载到工作节点中

object RegisteryWrapper {
  @transient lazy val schemaClient: SchemaRegisteryClient = new SchemaRegisteryClient()
}
Run Code Online (Sandbox Code Playgroud)

当你需要在里面使用它时foreachRDD

someStream.foreachRDD { 
   rdd => rdd.foreachPartition { iterator => 
       val schemaClient = RegisteryWrapper.schemaClient
       iterator.foreach(schemaClient.send(_))
  }
}
Run Code Online (Sandbox Code Playgroud)