Mik*_*sky 5 apache-spark apache-spark-sql spark-structured-streaming
一直在围绕Spark结构化流进行操作mapGroupsWithState(尤其是遵循Spark源代码中的StructuredSessionization示例)。我想确认mapGroupsWithState我的用例存在的一些限制。
就我而言,会话是用户的一组不间断活动,因此,两个按时间顺序排列(按事件时间而不是处理时间)的事件之间的间隔不会超过开发人员定义的持续时间(通常30分钟)。
在进入代码之前,一个示例将有所帮助:
{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}
Run Code Online (Sandbox Code Playgroud)
对于上面的流,会话定义为30分钟的不活动时间。在流媒体上下文中,我们应该以一个会话结束(第二个会话尚未完成):
{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}
Run Code Online (Sandbox Code Playgroud)
现在考虑以下Spark驱动程序:
[
{
"user_id": "mike",
"startTimestamp": "2018-01-01T00:00:00",
"endTimestamp": "2018-01-01T00:05:00"
}
]
Run Code Online (Sandbox Code Playgroud)
该程序的输出为:
root
|-- event_time: timestamp (nullable = true)
|-- user_id: string (nullable = true)
state update for user mike (current watermark: 1969-12-31 19:00:00.0)
User mike has new events (min: 2018-01-01 00:00:00.0, max: 2018-01-01 00:05:00.0).
User mike has no existing state.
User mike state updated. Timeout now set to 2018-01-01 00:35:00.0
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
| mike| 1514782800000| 1514783100000| 300000| false|
+------+----------------+--------------+----------+-------+
state update for user mike (current watermark: 2017-12-31 23:05:00.0)
User mike has new events (min: 2018-01-01 00:45:00.0, max: 2018-01-01 00:45:00.0).
User mike has existing state.
User mike state updated. Timeout now set to 2018-01-01 01:15:00.0
-------------------------------------------
Batch: 1
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
| mike| 1514782800000| 1514785500000| 2700000| false|
+------+----------------+--------------+----------+-------+
Run Code Online (Sandbox Code Playgroud)
根据我的会话定义,第二批事件中的单个事件应触发会话状态到期,从而触发新的会话。但是,由于水印(2017-12-31 23:05:00.0)尚未通过状态的超时(2018-01-01 00:35:00.0),因此状态没有过期,并且该事件被错误地添加到现有会话中,尽管自上一批次的最新时间戳以来已过去了30分钟。
我认为,使会话状态失效的唯一方法是希望该批处理中是否收到了来自不同用户的足够多的事件,以使水印超过状态超时时间mike。
我想也可以弄乱流的水印,但我想不出要如何完成用例。
这个准确吗?在Spark中如何正确进行基于事件时间的会话化时,我是否缺少任何内容?
如果水印间隔大于会话间隙持续时间,您提供的实现似乎不起作用。
对于您所展示的逻辑,您需要将水印间隔设置为 < 30 分钟。
如果您确实希望水印间隔独立于(或大于)会话间隙持续时间,则需要等到水印经过(水印+间隙)才能使状态过期。合并逻辑似乎是盲目地合并窗口。这应该在合并之前考虑间隙持续时间。
| 归档时间: |
|
| 查看次数: |
761 次 |
| 最近记录: |