我们有以下高级DSL处理拓扑:
TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);
KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");
KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");
KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);
KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")
Run Code Online (Sandbox Code Playgroud)
简而言之,我们上面所做的是:
我们的想法是创建窗口事件计数并使用这些窗口键进行连接和聚合操作(在KTable案例中,这些操作没有窗口)
问题是:连接和聚合操作的状态存储没有保留机制,导致磁盘空间爆炸(RocksDB).
更具体地说:(跳跃)窗口导致键上的笛卡尔积,并且没有删除旧窗口的机制.
如果KTable键没有窗口化,而且只有足够多的唯一键,也会出现同样的问题
请注意,状态存储支持table1和table2没有空间问题,这是因为它们通过DSL提供了一个窗口存储来管理丢弃旧窗口.在连接和聚合中,我们将窗口密钥视为"任何旧密钥",DSL也使用非窗口KeyValueStore.
此问题与以下内容有关:KAFKA-4212,KAFKA-4273,汇合论坛问题
这里有没有被误解的概念?有没有一种使用DSL实现此拓扑的简单方法?如果没有,使用低级API实现它的建议方法是什么?