我正在使用Spark Streaming使用Structured Streaming框架从Kinesis读取数据,我的连接如下
val kinesis = spark
.readStream
.format("kinesis")
.option("streams", streamName)
.option("endpointUrl", endpointUrl)
.option("initialPositionInStream", "earliest")
.option("format", "json")
.schema(<my-schema>)
.load
Run Code Online (Sandbox Code Playgroud)
数据来自几个具有唯一ID的物联网设备,我需要通过此ID和时间戳字段上的翻滚窗口聚合数据,如下所示:
val aggregateData = kinesis
.groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
.agg(...)
Run Code Online (Sandbox Code Playgroud)
我遇到的问题是我需要保证每个窗口都在圆形时间开始(例如00:00,00:00:00:00等),同时我需要保证只有包含完整15-的行分钟长的窗口将输出到我的水槽,我目前正在做的是
val query = aggregateData
.writeStream
.foreach(postgreSQLWriter)
.outputMode("update")
.start()
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
postgreSQLWriter是我创建的StreamWriter,用于将每一行插入到PostgreSQL SGBD中.如何强制我的窗口长度为15分钟,开始时间为每个设备唯一ID的15分钟时间戳值?