emi*_*ini 5 scala apache-kafka apache-kafka-streams spring-kafka
我有一个有数据流的主题。我需要的是从该主题创建一个单独的主题,该主题仅具有给定键的最新值集。
我认为 KTable 的全部目的是存储给定键的最新值,而不是存储整个事件流。但是我似乎无法让它发挥作用。运行下面的代码会生成密钥库,但该密钥库 (maintopiclatest) 中有一个事件流(不仅仅是最新的值)。因此,如果我在主题中两次发送包含 1000 条记录的请求,而不是看到 1000 条记录,而是看到 2000 条记录。
var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();
var stream = kStreamBuilder.stream("maintopic",
Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));
var table = stream
.groupByKey()
.reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));
Run Code Online (Sandbox Code Playgroud)
另一个问题是,如果我想将 KTable 存储在一个新主题中,我不知道该怎么做。为了做到这一点,似乎我必须把它转回一个流,以便我可以在它上面调用“.to”。但是,其中包含整个事件流,而不仅仅是最新的值。
这不是 KTable 的工作方式。
KTable 本身有一个内部状态存储,并且每个键只存储一条记录。然而,KTable 会不断更新并受到所谓的流表二元性的影响。KTable 的每次更新都会作为变更日志记录发送到下游:https ://docs.confluence.io/current/streams/concepts.html#duality-of-streams-and-tables 。因此,每个输入记录都会产生一个输出记录。
因为它是流处理,所以不存在“每个值的最后一个键”。
我有一个主题,其中有数据流。我需要的是从该主题创建一个单独的主题,该主题仅包含给定键的最新值集。
您希望 KTable 在哪个时间点发出更新?这个问题没有答案,因为输入流在概念上是无限的。
归档时间: |
|
查看次数: |
3261 次 |
最近记录: |