Spark Streaming理解mapGroupsWithState中的超时设置

Min*_*Shi 5 apache-spark spark-structured-streaming

我正在努力理解使用mapGroupsWithStateSpark 结构化流时的超时设置。

下面的链接有非常详细的规范,但我不确定我是否正确理解它,尤其是GroupState.setTimeoutTimeStamp()选项。意味着将状态到期时间设置为与事件时间相关。 https://spark.apache.org/docs/3.0.0-preview/api/scala/org/apache/spark/sql/streaming/GroupState.html

我把它们复制在这里:

With EventTimeTimeout, the user also has to specify the the the event time watermark in the query using Dataset.withWatermark(). 

With this setting, data that is older than the watermark are filtered out. 
The timeout can be set for a group by setting a timeout timestamp usingGroupState.setTimeoutTimestamp(), and the timeout would occur when the watermark advances beyond the set timestamp. 

You can control the timeout delay by two parameters - watermark delay and an additional duration beyond the timestamp in the event (which is guaranteed to be newer than watermark due to the filtering). 

Guarantees provided by this timeout are as follows:
Timeout will never be occur before watermark has exceeded the set timeout.
Similar to processing time timeouts, there is a no strict upper bound on the delay when the timeout actually occurs. The watermark can advance only when there is data in the stream, and the event time of the data has actually advanced.
Run Code Online (Sandbox Code Playgroud)

问题1timestamp :这句话里的这个是什么and the timeout would occur when the watermark advances beyond the set timestamp?它是绝对时间还是相对于该州当前事件时间的持续时间?我知道我可以通过“``删除状态来使其过期

例如,假设我有一些如下所示的数据状态,when它会通过设置而过期what valuewhat settings

+-------+-----------+-------------------+
|expired|something  |          timestamp|
+-------+-----------+-------------------+
|  false|   someKey |2020-08-02 22:02:00|
+-------+-----------+-------------------+
Run Code Online (Sandbox Code Playgroud)

问题2:阅读这句话Data that is older than the watermark are filtered out,我了解到从kafka读取迟到数据后会被忽略,这是正确的吗?

问题原因 如果不理解这些,我就无法真正将它们应用到用例中。意思是什么时候用GroupState.setTimeoutDuration(),什么时候用GroupState.setTimeoutTimestamp()

多谢。

附:我也尝试阅读下面的内容

-  https://www.waitingforcode.com/apache-spark-structured-streaming/stateful-transformations-mapgroupswithstate/read
(confused me, did not understand)
- https://databricks.com/blog/2017/10/17/arbitrary-stateful-processing-in-apache-sparks-structured-streaming.html
(did not say a lot of it for my interest)
Run Code Online (Sandbox Code Playgroud)

mik*_*ike 2

timestamp句子中的这个是什么and the timeout would occur when the watermark advances beyond the set timestamp

这是您设置的时间戳GroupState.setTimeoutTimestamp()

它是绝对时间还是相对于该州当前事件时间的持续时间?

这是基于当前批处理窗口的相对时间(而不是持续时间)。

假设我有一些数据状态(列timestamp=2020-08-02 22:02:00),它什么时候会通过在什么设置中设置什么值而过期?

假设您的接收器查询定义了trigger()5 分钟的处理触发器(由 设定)。groupByKey另外,我们假设您在应用和之前已经使用了水印mapGroupsWithState。我知道您想使用基于事件时间的超时(而不是处理时间,因此您的查询将类似于:

ds.withWatermark("timestamp", "10 minutes")
  .groupByKey(...) // declare your key
  .mapGroupsWithState(
    GroupStateTimeout.EventTimeTimeout)(
    ...) // your custom update logic
Run Code Online (Sandbox Code Playgroud)

现在,这取决于您如何使用“自定义更新逻辑”设置 TimeoutTimestamp。在您的自定义更新逻辑中的某个位置您需要调用

state.setTimeoutTimestamp()
Run Code Online (Sandbox Code Playgroud)

该方法有四个不同的签名,值得浏览一下他们的文档。由于我们在 ( ) 中设置了水印,因此withWatermark我们实际上可以利用这段时间。作为一般规则:将超时时间戳(由 设定state.setTimeoutTimestamp())设置为大于当前水印 的值非常重要。为了继续我们的示例,我们添加一小时,如下所示:

state.setTimeoutTimestamp(state.getCurrentWatermarkMs, "1 hour")
Run Code Online (Sandbox Code Playgroud)

总而言之,您的消息可以在22:00:00和 之间到达您的流22:15:00,并且如果该消息是该密钥的最后一条消息,它将在23:15:00您的 GroupState 中超时。

问题2:阅读这句话Data that is older than the watermark are filtered out,我了解到从kafka读取迟到数据后会被忽略,这是正确的吗?

是的,这是正确的。对于批处理间隔 22:00:00 - 22:05:00,所有具有事件时间(由 column 定义timestamp)晚于声明的 10 分钟水印(意味着晚于 22:15:00)到达的消息都将被忽略在您的查询中,并且不会在您的“自定义更新逻辑”中进行处理。