luc*_*umi 0 apache-flink flink-streaming
我正在尝试在我的 Flink 作业中使用事件时间,并BoundedOutOfOrdernessTimestampExtractor用于提取时间戳并生成水印。但是我有一些输入Kafka具有稀疏流,它可以长时间没有数据,这使得根本没有调用getResultin AggregateFunction。我可以看到数据正在add发挥作用。
我已经设置getEnv().getConfig().setAutoWatermarkInterval(1000L);
我试过了
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
.allowedLateness(WINDOW_LATENESS)
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)
还有会话窗口
eventsWithKey
.keyBy(entry -> (String) entry.get(key))
.window(EventTimeSessionWindows.withGap(Time.seconds(30)))
.aggregate(new CountTask(basicMetricTags, windowInMinutes))
Run Code Online (Sandbox Code Playgroud)
所有水印指标都显示No Watermark
如何让 Flink 忽略无水印的东西?
仅供参考,这通常被称为“空闲源”问题。发生这种情况是因为每当 Flink 算子有两个或更多输入时,它的水印就是其输入中最小的水印。如果这些输入之一停止,则其水印不再前进。
请注意,Flink 没有每个键的水印——一个给定的操作符通常跨多个键的事件复用。只要某些事件流经给定任务的输入流,其水印就会提前,空闲键的事件时间计时器仍将触发。要发生这种“空闲源”问题,任务必须有一个完全空闲的输入流。
如果可以安排,最好的解决方案是让您的数据源包含 keepalive 事件。这将使您能够自信地推进水印,因为知道源只是闲置,而不是例如离线。
如果这是不可能的,并且如果您有一些未空闲的源,那么您可以将 arebalance()放在 the 前面BoundedOutOfOrdernessTimestampExtractor(和 keyBy 之前),以便每个实例继续接收一些事件并可以推进其水印。这是以额外的网络洗牌为代价的。
也许最常用的解决方案是使用水印生成器来检测空闲并根据处理时间计时器人为地推进水印。ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor就是一个例子。