小编gle*_*man的帖子

KTable状态存储无限保留

我们有以下高级DSL处理拓扑:

TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);

KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");


KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");

KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")
Run Code Online (Sandbox Code Playgroud)

简而言之,我们上面所做的是:

  1. 使用跳跃窗口计数事件
  2. 在生成的KTable之间进行左连接(由于业务逻辑而离开)
  3. 分组和更改键和值:获取键的组件(长整数)并移动到该值
  4. 将生成的KTable聚合到最终的KTable,聚合对象是从T到步骤1中的两个连接计数器的映射.请注意,映射的大小不超过600,通常要小得多.

我们的想法是创建窗口事件计数并使用这些窗口键进行连接和聚合操作(在KTable案例中,这些操作没有窗口)

问题是:连接聚合操作的状态存储没有保留机制,导致磁盘空间爆炸(RocksDB).

更具体地说:(跳跃)窗口导致键上的笛卡尔积,并且没有删除旧窗口的机制.

如果KTable键没有窗口化,而且只有足够多的唯一键,也会出现同样的问题

请注意,状态存储支持table1和table2没有空间问题,这是因为它们通过DSL提供了一个窗口存储来管理丢弃旧窗口.在连接和聚合中,我们将窗口密钥视为"任何旧密钥",DSL也使用非窗口KeyValueStore.

此问题与以下内容有关:KAFKA-4212,KAFKA-4273,汇合论坛问题

这里有没有被误解的概念?有没有一种使用DSL实现此拓扑的简单方法?如果没有,使用低级API实现它的建议方法是什么?

apache-kafka-streams

7
推荐指数
1
解决办法
1524
查看次数

标签 统计

apache-kafka-streams ×1