Mat*_*ard 5 java apache-kafka apache-kafka-streams
当通过 kafka-steams 应用程序推送批量数据时,我看到它多次记录以下消息......
WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment.
...我希望通过 leftJoin 步骤连接的数据似乎丢失了。
我在实践中看到过这种情况,无论是当我的应用程序关闭一段时间然后重新启动时,或者当我使用类似 app -reset-tool 的东西来尝试让应用程序重新处理过去的数据时。
我能够通过向相隔一小时的两个主题生成 1000 条消息(按顺序排列原始时间戳)来单独重现此行为,然后让 kafka 流为它们选择一个密钥并尝试 leftJoin 这两个重新设置密钥的流。
该复制品的自包含源代码可在https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java获取
实际的 kafka-streams 拓扑如下所示。
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> leftStream = builder.stream(leftTopic);
final KStream<String, String> rightStream = builder.stream(rightTopic);
final KStream<String, String> rekeyedLeftStream = leftStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));
final KStream<String, String> rekeyedRightStream = rightStream
.selectKey((k, v) -> v.substring(0, v.indexOf(":")));
JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
final KStream<String, String> joined = rekeyedLeftStream.leftJoin(
rekeyedRightStream,
(left, right) -> left + "/" + right,
joinWindow
);
Run Code Online (Sandbox Code Playgroud)
...我产生的最终输出看起来像这样...
...
523 [523,left/null]
524 [524,left/null, 524,left/524,right]
525 [525,left/525,right]
526 [526,left/null]
527 [527,left/null]
528 [528,left/528,right]
529 [529,left/null]
530 [530,left/null]
531 [531,left/null, 531,left/531,right]
532 [532,left/null]
533 [533,left/null]
534 [534,left/null, 534,left/534,right]
535 [535,left/null]
536 [536,left/null]
537 [537,left/null, 537,left/537,right]
538 [538,left/null]
539 [539,left/null]
540 [540,left/null]
541 [541,left/null]
542 [542,left/null]
543 [543,left/null]
...
Run Code Online (Sandbox Code Playgroud)
...鉴于输入数据,我希望看到每一行都以连接的两个值结束,而不是正确的值为空。
(请注意,我们最初获得每个值的左/空值是很好/预期的 - 这是我所理解的 kafka-streams left join 的预期语义。)
我注意到,如果我在连接窗口上设置一个非常大的宽限值,问题就解决了,但由于我提供的输入没有乱序,我没想到需要这样做,而且我厌倦了在具有大量容量的应用程序上实际执行此操作的资源需求。
我怀疑正在发生一些事情,当处理一个分区时,它会导致流时间被前推到该分区中的最新消息,这意味着当检查下一个分区时,会发现它包含许多“太”的记录与流时间相比“旧”。然而,我希望有人可以向我指出一个设置来改变这种行为,或者一些其他解决方案,以避免在应用程序赶上积压的数据时产生不准确的结果,而不会造成很大的性能开销。
您在空间中发布消息 1 小时后就无法加入?
现在您正在使用:
JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
Run Code Online (Sandbox Code Playgroud)
将其更改为更高的数字或添加宽限期将允许您处理更多消息,消息间隔为 1 小时,并且您有 1000 条消息,因此值:
JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)).grace(Duration.ofDays(42));
Run Code Online (Sandbox Code Playgroud)
是需要的。(因为 1000 * 5h 接近 42 天)
因此您需要根据数据大小调整该值,以便始终能够对您期望的所有消息执行此操作。
然后我得到你期望的结果......我想是的。因为我不太确定这里的其他空值,但你似乎说它是预期的 - 没有分析该部分。有些整体确实拥有它,而另一些则没有。
11 [11:left/null, 11:left/11:right]
12 [12:left/12:right]
13 [13:left/null, 13:left/13:right]
14 [14:left/null, 14:left/14:right]
15 [15:left/null, 15:left/15:right]
16 [16:left/null, 16:left/16:right]
17 [17:left/17:right]
18 [18:left/null, 18:left/18:right]
19 [19:left/null, 19:left/19:right]
20 [20:left/null, 20:left/20:right]
21 [21:left/null, 21:left/21:right]
22 [22:left/null, 22:left/22:right]
23 [23:left/null, 23:left/23:right]
24 [24:left/null, 24:left/24:right]
25 [25:left/25:right]
26 [26:left/null, 26:left/26:right]
Run Code Online (Sandbox Code Playgroud)
但所有 1000 个结果始终都有预设的有效对。
您想要旧数据,因此您必须同意旧数据。
但据我了解,有一个非常长的宽限期将是昂贵的
如果您的宽限期比您需要的长得多,那么成本可能会很高,但在这种情况下,这正是您所需要的。除非你可以完全避免这样做。
正如您在文档中看到的,恩典完全满足您的要求(或者更确切地说,您不想要的,其默认的低值):
https: //kafka.apache.org/22/javadoc/org/apache/kafka/流/kstream/JoinWindows.html#grace-java.time.Duration-
拒绝在窗口结束后超过 afterWindowEnd 到达的延迟事件。迟到被定义为(stream_time - record_timestamp)。
另一种选择是使用更大的窗口,但这看起来不是适合您的情况的正确解决方案:
JoinWindows joinWindow = JoinWindows.of(Duration.ofDays(42);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1048 次 |
| 最近记录: |