相关疑难解决方法(0)

Stream-Static Join:如何定期刷新(非持久化/持久化)静态数据帧

我正在构建一个 Spark Structured Streaming 应用程序,我正在其中执行批处理流连接。批处理数据的源会定期更新。

因此,我计划定期对该批处理数据进行持久化/非持久化。

下面是我用来持久化和取消持久化批处理数据的示例代码。

流动:

  • 读取批处理数据
  • 持久化批处理数据
  • 每隔一小时,取消持久化数据并读取批处理数据并再次持久化。

但是,我没有看到批处理数据每小时刷新一次。

代码:

var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()

if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime) {
  refreshedTime = Instant.now()
  batchDF.unpersist(false)
  batchDF =  handler.readBatchDF(sparkSession)
    .persist(StorageLevel.MEMORY_AND_DISK)
}
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法可以在 Spark 结构化流媒体作业中实现这种情况?

scala apache-spark spark-streaming apache-spark-sql spark-structured-streaming

4
推荐指数
1
解决办法
455
查看次数