如何在 kafka 主题中仅存储最新的键值

emi*_*ini 5 scala apache-kafka apache-kafka-streams spring-kafka

我有一个有数据流的主题。我需要的是从该主题创建一个单独的主题,该主题仅具有给定键的最新值集。

我认为 KTable 的全部目的是存储给定键的最新值,而不是存储整个事件流。但是我似乎无法让它发挥作用。运行下面的代码会生成密钥库,但该密钥库 (maintopiclatest) 中有一个事件流(不仅仅是最新的值)。因此,如果我在主题中两次发送包含 1000 条记录的请求,而不是看到 1000 条记录,而是看到 2000 条记录。

var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();

var stream = kStreamBuilder.stream("maintopic",
    Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));

var table = stream
    .groupByKey()
    .reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));
Run Code Online (Sandbox Code Playgroud)

另一个问题是,如果我想将 KTable 存储在一个新主题中,我不知道该怎么做。为了做到这一点,似乎我必须把它转回一个流,以便我可以在它上面调用“.to”。但是,其中包含整个事件流,而不仅仅是最新的值。

Mat*_*Sax 3

这不是 KTable 的工作方式。

KTable 本身有一个内部状态存储,并且每个键只存储一条记录。然而,KTable 会不断更新并受到所谓的流表二元性的影响。KTable 的每次更新都会作为变更日志记录发送到下游:https ://docs.confluence.io/current/streams/concepts.html#duality-of-streams-and-tables 。因此,每个输入记录都会产生一个输出记录。

因为它是流处理,所以不存在“每个值的最后一个键”。

我有一个主题,其中有数据流。我需要的是从该主题创建一个单独的主题,该主题仅包含给定键的最新值集。

您希望 KTable 在哪个时间点发出更新?这个问题没有答案,因为输入流在概念上是无限的。

  • 仅在变更日志主题被“压缩”后,旧值才会被删除,并且仅保留每个键的最新记录。请注意,如果您处理 KTable,则不会处理快照,而是处理变更日志主题。-- KTables 是具体化的客户端(默认情况下使用 RocksDB 键值存储)。如果您想“时间点”查找当前的 KTable 状态,可以使用“交互式查询”:https://docs.confluence.io/current/streams/developer-guide/interactive-queries.html (4认同)
  • “当前状态”意味着“处理的当前状态”——如果您默认执行`builder.table("topic")`,则处理从最早的偏移量开始。因此,在处理过程中,完整的主题被消耗,并且 KTable 状态被更新、更新、更新。假设上游生产者应用程序不断写入“主题”,处理永远不会完成(注意,这是正常情况,因为它是流处理)。因此,在处理“当前状态”期间,对应于“开始到当前偏移”的数据的状态(当前偏移 < topic-end-offset) (3认同)
  • “我认为 KTable 的整个目的类似于数据库表,仅通过键存储‘最新’值”——这是正确的,但您必须将其视为随时间演变的“快照”。“我很困惑”——这种情况经常发生——这是一种完全不同的方法,你需要建立一个新的思维模型(这需要时间......)——你还必须区分*当前的* KTable状态及其变更日志主题——当前状态每个键只存储一条记录——但是,变更日志主题存储*在一段时间内*的更新。 (2认同)
  • 正确的。对于“toStream()”,它为您提供完整的历史记录(注意,当您到达输入主题的末尾时,“toStream()”不会开始处理,但它在处理输入主题时同时运行 - - 它基本上在处理时创建所有表更新的变更日志流)。如果您想迭代存储中的所有键(对于某些“快照”),您可以通过“store.all()”使用 IQ。 (2认同)