Spark Streaming保证特定的启动窗口时间

Edu*_*gan 6 scala apache-spark amazon-kinesis spark-streaming

我正在使用Spark Streaming使用Structured Streaming框架从Kinesis读取数据,我的连接如下

val kinesis = spark
  .readStream
  .format("kinesis")
  .option("streams", streamName)
  .option("endpointUrl", endpointUrl)
  .option("initialPositionInStream", "earliest")
  .option("format", "json")
  .schema(<my-schema>)
  .load
Run Code Online (Sandbox Code Playgroud)

数据来自几个具有唯一ID的物联网设备,我需要通过此ID和时间戳字段上的翻滚窗口聚合数据,如下所示:

val aggregateData = kinesis
    .groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
    .agg(...)
Run Code Online (Sandbox Code Playgroud)

我遇到的问题是我需要保证每个窗口都在圆形时间开始(例如00:00,00:00:00:00等),同时我需要保证只有包含完整15-的行分钟长的窗口将输出到我的水槽,我目前正在做的是

val query = aggregateData
    .writeStream
      .foreach(postgreSQLWriter)
      .outputMode("update")
      .start()
      .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

postgreSQLWriter是我创建的StreamWriter,用于将每一行插入到PostgreSQL SGBD中.如何强制我的窗口长度为15分钟,开始时间为每个设备唯一ID的15分钟时间戳值?

小智 3

问题1:要在特定时间启动,spark分组函数还需要一个参数,即“offset”。通过指定它将在指定时间后从一小时开始示例:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute"))
Run Code Online (Sandbox Code Playgroud)

所以上面的语法将按column1分组并创建持续时间为22分钟的窗口,滑动窗口大小为1分钟,偏移量为15分钟

例如它从:

window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes)
window2: 8:16(previous window start + 1 minute) to 8:38 ( 22 minute size again)
Run Code Online (Sandbox Code Playgroud)

问题2:要仅推送那些具有完整 15 分钟大小的窗口,请创建一个计数列来计算该窗口中的事件数。一旦达到 15,使用过滤器命令将其推送到您想要的任何位置

计算计数:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count"))
Run Code Online (Sandbox Code Playgroud)

仅包含计数 15 的 writestream 过滤器:

aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()
Run Code Online (Sandbox Code Playgroud)