我正在构建一个 Spark Structured Streaming 应用程序,我正在其中执行批处理流连接。批处理数据的源会定期更新。
因此,我计划定期对该批处理数据进行持久化/非持久化。
下面是我用来持久化和取消持久化批处理数据的示例代码。
流动:
但是,我没有看到批处理数据每小时刷新一次。
代码:
var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()
if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
refreshedTime = Instant.now()
batchDF.unpersist(false)
batchDF = handler.readBatchDF(sparkSession)
.persist(StorageLevel.MEMORY_AND_DISK)
}
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法可以在 Spark 结构化流媒体作业中实现这种情况?
scala apache-spark spark-streaming apache-spark-sql spark-structured-streaming