utk*_*ava 4 aggregate apache-spark spark-streaming spark-structured-streaming
我正在尝试使用mapGroupsWithState方法对传入的数据流进行有状态结构化流处理。但我面临的问题是,我为groupByKey选择的键使我的状态太大太快。明显的出路是更改密钥,但我希望在更新方法中应用的业务逻辑要求密钥与我现在拥有的密钥完全相同,或者如果可能的话,访问所有密钥的GroupState 。
例如,我有来自各个组织的数据流,通常组织包含 userId、personId 等。请参阅下面的代码:
val stream: Dataset[User] = dataFrame.as[User]
val noTimeout = GroupStateTimeout.NoTimeout
val statisticStream = stream
.groupByKey(key => key.orgId)
.mapGroupsWithState(noTimeout)(updateUserStatistic)
val df = statisticStream.toDF()
val query = df
.writeStream
.outputMode(Update())
.option("checkpointLocation", s"$checkpointLocation/$name")
.foreach(new UserCountWriter(spark.sparkContext.getConf))
.outputMode(Update())
.queryName(name)
.trigger(Trigger.ProcessingTime(Duration.apply("10 seconds")))
Run Code Online (Sandbox Code Playgroud)
案例类别:
case class User(
orgId: Long,
profileId: Long,
userId: Long)
case class UserStatistic(
orgId: Long,
known: Long,
uknown: Long,
userSeq: Seq[User])
Run Code Online (Sandbox Code Playgroud)
更新方法:
def updateUserStatistic(
orgId: Long,
newEvents: Iterator[User],
oldState: GroupState[UserStatistic]): UserStatistic = {
var state: UserStatistic = if (oldState.exists) oldState.get else UserStatistic(orgId, 0L, 0L, Seq.empty)
for (event <- newEvents) {
//business logic like checking if userId in this organization is of certain type and then accordingly update the known or unknown attribute for that particular user.
oldState.update(state)
state
}
Run Code Online (Sandbox Code Playgroud)
当我必须在 Driver-Executor 模型上执行此操作时,问题会变得更糟,因为我预计每个组织中有 1-1000 万用户,这可能意味着单个执行器上有这么多状态(如果我理解错误,请纠正我。)
可能失败的解决方案:
任何帮助或建议表示赞赏。
您的状态大小不断增加,因为在当前实现中,不会从 GroupState 中删除任何键/状态对。
为了准确缓解您面临的问题(无限增加状态),该mapGroupsWithState方法允许您使用Timeout。您可以选择两种类型的超时:
GroupStateTimeout.ProcessingTimeTimeout或GroupState.setTimeoutDuration()GroupStateTimeout.EventTimeTimeoutwith GroupState.setTimeoutTimestamp()。请注意,它们之间的区别是基于持续时间的超时和更灵活的基于时间的超时。
在该特征的 ScalaDocs 中,GroupState您会找到一个关于如何在映射函数中使用超时的很好的模板:
def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {
if (state.hasTimedOut) { // If called when timing out, remove the state
state.remove()
} else if (state.exists) { // If state exists, use it for processing
val existingState = state.get // Get the existing state
val shouldRemove = ... // Decide whether to remove the state
if (shouldRemove) {
state.remove() // Remove the state
} else {
val newState = ...
state.update(newState) // Set the new state
state.setTimeoutDuration("1 hour") // Set the timeout
}
} else {
val initialState = ...
state.update(initialState) // Set the initial state
state.setTimeoutDuration("1 hour") // Set the timeout
}
...
// return something
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1169 次 |
| 最近记录: |