Ace*_*oud 2 java apache-kafka apache-flink flink-streaming
我有一个配置有 Kafka 连接器的 Flink 管道。
我使用以下命令将水印生成频率设置为 2 秒:
env.getConfig().setAutoWatermarkInterval(2000);
Run Code Online (Sandbox Code Playgroud)
现在,对于流窗口,我的滚动窗口为 60 秒,我们在其中进行一些聚合,并且根据数据字段之一的时间戳进行基于事件时间的处理。
我尚未为我的水印策略或我的流配置 allowedLateness。
final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
topicConfig.name(),
AvroDeserializationSchema.forSpecific(MyPojo.class),
topicConfig.forConsumer()
);
myEvents.setStartFromLatest();
myEvents.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyPojo>forBoundedOutOfOrderness(
Duration.ofSeconds(30))
.withIdleness(Duration.ofSeconds(120))
.withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));
Run Code Online (Sandbox Code Playgroud)
Q.1 根据我所读到的内容,我的时间 0-60 的窗口将在 90 秒后计算,30-90 在 120 秒后计算,依此类推。然而,由于我们正在做翻滚窗口,即没有重叠,我的猜测是没有 30-90 窗口,0-60 之后的下一个窗口是 60-120,在 150 秒标记处触发,我是对的吗?
Q.2 如果没有 allowedLateness,所有迟到的数据都将被丢弃,例如。90 秒后到达的时间戳为 45 的事件被认为是无序的,将超出第一个窗口,即 0-60。对于窗口 60-120,事件时间戳不匹配,因此它将被丢弃并且不包含在窗口在 150 秒处触发,对吗?
Q.3. 对于源空闲持续时间,我选择 120,表示如果该主题的任何 Kakfa 分区不活动且有数据,则在 2 分钟后将其标记为空闲,然后发送其他活动分区的水印。我的问题是选择这个数字,即 2 分钟,以及它是否与窗口持续时间(60 秒)或无序(30 秒)有关。如果是这样,我应该在这里记住什么来进行适当的选择,这样我就不会因为空闲分区导致的非前进水印而导致数据滞留?
或者 120 的等待时间太长,我可能会丢失数据,因此我应该将其设置为远小于 OutOfOrderness 持续时间以确保 0 数据丢失?
编辑:添加了更多代码
Q1:是的,没错。
Q2:是的,这也是正确的。
Q3:这里的详细信息取决于您是否让 Kafka 源应用 WatermarkStrategy(在这种情况下,它将执行每个分区的水印),或者 WatermarkStrategy 是否作为单独的运算符部署在源运算符之后的某个位置(通常立即链接在源运算符之后) 。
在第一种情况下(由 完成每个分区水印FlinkKafkaConsumer),您将执行如下操作:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>(...);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy ...);
DataStream<MyType> stream = env.addSource(kafkaSource);
Run Code Online (Sandbox Code Playgroud)
而在源之后单独进行水印则如下所示:
DataStream<MyType> events = env.addSource(...);
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration ...)
.withTimestampAssigner((event, timestamp) -> event.timestamp));
Run Code Online (Sandbox Code Playgroud)
当水印在每个分区的基础上完成时,单个空闲分区将保留处理该分区的消费者/源实例的水印——直到空闲超时开始(在您的示例中为 120 秒)。相比之下,如果水印是在链接到源的单独运算符中完成的,则只有当分配给该源实例的所有分区(具有空闲分区的分区)都空闲时,水印才会被保留(同样,对于 120秒)。
但无论这些细节如何,都希望不会丢失数据。将有一段时间窗口不会被触发(因为水印没有前进),但事件将继续被处理并分配给适当的窗口。一旦水印恢复,这些窗口将关闭并提供结果。
发生数据丢失的情况是,如果分区空闲,因为上游的某些故障导致中断,最终产生一堆延迟事件。空闲超时到期后,水位线将前进,如果源空闲是因为上游某些东西被破坏(而不是因为根本没有事件),那么最终到达的那些事件将迟到(除非你的边界超出-有序延迟足够大以容纳它们)。如果您选择忽略迟到的事件,那么这些事件将会丢失。
| 归档时间: |
|
| 查看次数: |
2236 次 |
| 最近记录: |