Kafka Stream:KTable 物化

Ram*_*man 2 apache-kafka apache-kafka-streams

如何识别主题的 KTable 实现何时完成?

例如,假设 KTable 有几百万行。伪代码如下:

KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows
Run Code Online (Sandbox Code Playgroud)

在某个时间点,我想安排一个线程来调用以下写入主题的线程: kt.toStream().to("output_topic_name");

我想确保所有数据都作为上述调用的一部分写入。此外,一旦调用上述“to”方法,是否可以在下一个计划中调用它,或者第一个调用是否始终保持活动状态?

后续问题:

约束
1) 好的,我看到一旦 kafkastream 启动,kstream 和 ktable 是无界/无限的。但是,ktable 实现(到压缩的主题)不会在指定时间段内为同一键发送多个条目。

因此,除非压缩过程尝试清理这些并仅保留最新的,否则下游应用程序将使用从主题查询的相同键的所有可用条目,从而导致重复。即使压缩过程进行了某种程度的清理,也总是不可能在给定的时间点,随着压缩过程的追赶,某些键具有多个条目。

我假设 KTable 在 RocksDB 中只有一个给定键的记录。如果我们有办法安排实现,这将有助于避免重复。此外,减少在主题中持久化的数据量(增加存储量),增加网络流量,压缩过程的额外开销以清理它。

2) 也许 ReadOnlyKeyValueStore 将允许从存储中进行受控检索,但它仍然缺乏调度键、值和写入主题的方法,这需要额外的编码。

是否可以改进 API 以允许受控实现?

Mat*_*Sax 5

KTable 实现永远不会完成,您也不能“调用”a to()

当您使用 Streams API 时,您“插入”了一个 DAG 运算符。实际的方法调用,不会触发任何计算,而是修改算子的 DAG。

只有在您通过KafkaStreams#start()数据开始计算后才会被处理。请注意,您指定的所有运算符都将在计算开始后连续并发运行。

没有“计算结束”,因为输入预计是无界/无限的,因为上游应用程序可以随时将新数据写入输入主题。因此,您的程序永远不会自行终止。如果需要,您可以通过KafkaStreams#close()虽然停止计算。

在执行期间,您无法更改 DAG。如果你想改变它,你需要停止计算并创建一个新的KafkaStreams实例,将修改后的 DAG 作为输入

跟进:

是的。您必须将 KTable 视为“版本化表”,当条目更新时,它会随着时间的推移而演变。因此,所有更新都写入变更日志主题并作为变更记录发送到下游(请注意,KTables 也会进行一些缓存,以“删除”对同一密钥的连续更新:参见https://docs.confluent .io/current/streams/developer-guide/memory-mgmt.html)。

将消耗从主题查询的相同键的所有可用条目,从而导致重复。

我不会将它们视为“重复”,而是将其视为更新。是的,应用程序需要能够正确处理这些更新。

如果我们有办法安排实现,这将有助于避免重复。

具体化是一个连续的过程,只要输入主题中有新的输入记录可用并被处理,KTable 就会更新。因此,在任何时间点都可能有特定密钥的更新。因此,即使您完全控制何时向变更日志主题和/或下游发送更新,稍后也可能会有新的更新。这就是流处理的本质。

此外,减少在主题中持久化的数据量(增加存储量),增加网络流量,压缩过程的额外开销以清理它。

如上所述,缓存用于节省资源。

是否可以改进 API 以允许受控实现?

如果提供的 KTable 语义不符合您的要求,您始终可以将自定义运算符编写为Processoror Transformer,为其附加键值存储,并实现您需要的任何内容。