我正在尝试使用mapGroupsWithState方法对传入的数据流进行有状态结构化流处理。但我面临的问题是,我为groupByKey选择的键使我的状态太大太快。明显的出路是更改密钥,但我希望在更新方法中应用的业务逻辑要求密钥与我现在拥有的密钥完全相同,或者如果可能的话,访问所有密钥的GroupState 。
例如,我有来自各个组织的数据流,通常组织包含 userId、personId 等。请参阅下面的代码:
val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)
val df = statisticStream.toDF()
val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation", s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
Run Code Online (Sandbox Code Playgroud)
案例类别:
case class User(
orgId: Long,
profileId: Long,
userId: Long)
case class UserStatistic(
orgId: Long,
known: Long,
uknown: Long,
userSeq: Seq[User])
Run Code Online (Sandbox Code Playgroud)
更新方法:
def updateUserStatistic(
orgId: Long,
newEvents: Iterator[User],
oldState: GroupState[UserStatistic]): UserStatistic = { …Run Code Online (Sandbox Code Playgroud) aggregate apache-spark spark-streaming spark-structured-streaming