我有一个聚合在 KTable 上的拓扑。这是我创建的通用方法,用于根据我拥有的不同主题构建此拓扑。
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
Run Code Online (Sandbox Code Playgroud)
这在使用 Kafka 时效果很好,但在通过“TopologyTestDriver”进行测试时则不然。
在这两种情况下,当我获得更新时,subtractor首先调用 ,然后adder调用 。问题是,使用 时TopologyTestDriver,会发送两条消息以进行更新:一条在调用后subtractor,另一条在adder调用后。更不用说在之后subrtractor和之前发送的消息adder处于不正确的阶段。
其他人可以确认这是一个错误吗?我已经针对 …