如何配置checkpoint重新部署spark streaming应用程序?

Bin*_*ang 5 bigdata apache-spark spark-streaming

我正在使用Spark流来计算唯一身份用户.我用updateStateByKey,所以我需要配置一个检查点目录.我还在启动应用程序时从检查点加载数据,如doc中的示例所示:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
Run Code Online (Sandbox Code Playgroud)

这是一个问题,如果我的代码被更改,那么我重新部署代码,无论代码更改多少,都会加载检查点吗?或者我需要使用自己的逻辑来持久化我的数据并在下次运行时加载它们.

如果我使用自己的逻辑来保存和加载DStream,那么如果应用程序在失败时重新启动,那么数据是否都不会从checkpoint目录和我自己的数据库加载?

Xua*_*ham 3

检查点本身包括您的元数据、rdd、dag 甚至您的逻辑。如果您更改逻辑并尝试从最后一个检查点运行它,您很可能会遇到异常。如果你想使用自己的逻辑将数据保存在某个地方作为检查点,你可能需要实现一个 Spark 操作来将检查点数据推送到任何数据库,在下一次运行中,将检查点数据加载为初始 RDD(如果你正在使用 updateStateByKey API)并继续您的逻辑。