sen*_*iwu 3 apache-kafka apache-kafka-streams
在我的公司,我们广泛使用Kafka,但出于容错原因,我们一直使用关系数据库来存储多个中间转换和聚合的结果.现在我们正在探索Kafka Streams,这是一种更自然的方式.通常,我们的需求很简单 - 就是这样的一个例子
<K1,V1>, <K2,V2>, <K1,V2>, <K1,V3>...
<K1,V1>
被处理,并且两者<K1,V2>, <K1,V3>
都已经生成,那么我应该处理V3,因为V2已经变得陈旧为了达到这个目的,我正在阅读这个主题KTable
.代码如下所示
KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> kTable = builder.table("input-topic");
kTable.toStream().foreach((K,V) -> client.post(V));
return builder;
Run Code Online (Sandbox Code Playgroud)
这可以按预期工作,但我不清楚Kafka如何自动实现这一目标.我假设Kafka创建了内部主题来实现这一目标,但我没有看到任何内部主题被创建.该方法的Javadoc似乎证实了这一观察结果.但后来我遇到了这个官方页面,似乎表明Kafka使用了一个单独的数据存储区,即RocksDB以及一个更改日志主题.
现在我很困惑,因为在哪种情况下创建了changelog主题.我的问题是
我使用的是0.11.0
Kafka Streams在当地存储州.默认情况下使用RocksDB.然而,当地的国家是短暂的.对于容错,对商店的所有更新也会写入changelog主题.这允许在发生故障或扩展/缩小的情况下重建和/或迁移存储.对于您的特殊情况,不会创建任何更改日志主题,因为KTable
它不是聚合的结果,而是直接从主题填充 - 这只是一个优化.由于changelog主题将包含与输入主题完全相同的数据,因此Kafka Streams可避免数据重复,并在出现错误情况时将输入主题用作changelog主题.
不确定这个问题究竟是什么意思.请注意,RocksDB被认为是一个短暂的商店.默认情况下,它出于各种原因使用,如下所述:为什么Apache Kafka Streams使用RocksDB,以及如何更改它?(例如,它允许保持大于主内存的状态,因为它可以溢出到磁盘).您可以将RocksDB替换为任何其他商店.Kafka Streams还附带一个内存商店.(编辑)
那是对的.您需要相应地配置应用程序,以便能够存储整个状态的本地分片.有一个大小调整指南:https://docs.confluent.io/current/streams/sizing.html
RocksDB是用C++编写的,并通过JNI绑定包含在内.Windows上存在一些已知问题,因为RocksDB不为所有版本的Windows提供预编译的二进制文件.只要你继续使用基于Linux的平台,就可以了.Kafka社区为RocksDB运行升级测试,以确保它兼容.
是.Kafka Streams实际上假定table()
操作的输入主题是日志压缩的.否则,在发生故障时存在数据丢失的风险.(编辑)
null
写入带有value =的逻辑删除消息).注意,当在代理端执行压缩时,旧数据被垃圾收集,因此,在恢复时,只读取每个密钥的新版本 - 在压缩过程中删除旧版本.如果您在一段时间后对某些数据不感兴趣,则需要在源主题中编写一个逻辑删除以使其工作. 归档时间: |
|
查看次数: |
677 次 |
最近记录: |