我有一个配置有 Kafka 连接器的 Flink 管道。
我使用以下命令将水印生成频率设置为 2 秒:
env.getConfig().setAutoWatermarkInterval(2000);
Run Code Online (Sandbox Code Playgroud)
现在,对于流窗口,我的滚动窗口为 60 秒,我们在其中进行一些聚合,并且根据数据字段之一的时间戳进行基于事件时间的处理。
我尚未为我的水印策略或我的流配置 allowedLateness。
final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
topicConfig.name(),
AvroDeserializationSchema.forSpecific(MyPojo.class),
topicConfig.forConsumer()
);
myEvents.setStartFromLatest();
myEvents.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyPojo>forBoundedOutOfOrderness(
Duration.ofSeconds(30))
.withIdleness(Duration.ofSeconds(120))
.withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));
Run Code Online (Sandbox Code Playgroud)
Q.1 根据我所读到的内容,我的时间 0-60 的窗口将在 90 秒后计算,30-90 在 120 秒后计算,依此类推。然而,由于我们正在做翻滚窗口,即没有重叠,我的猜测是没有 30-90 窗口,0-60 之后的下一个窗口是 60-120,在 150 秒标记处触发,我是对的吗?
Q.2 如果没有 allowedLateness,所有迟到的数据都将被丢弃,例如。90 秒后到达的时间戳为 45 的事件被认为是无序的,将超出第一个窗口,即 0-60。对于窗口 60-120,事件时间戳不匹配,因此它将被丢弃并且不包含在窗口在 150 秒处触发,对吗?
Q.3. 对于源空闲持续时间,我选择 120,表示如果该主题的任何 Kakfa 分区不活动且有数据,则在 2 分钟后将其标记为空闲,然后发送其他活动分区的水印。我的问题是选择这个数字,即 2 分钟,以及它是否与窗口持续时间(60 秒)或无序(30 秒)有关。如果是这样,我应该在这里记住什么来进行适当的选择,这样我就不会因为空闲分区导致的非前进水印而导致数据滞留?
或者 120 的等待时间太长,我可能会丢失数据,因此我应该将其设置为远小于 OutOfOrderness 持续时间以确保 …