Kafka Streams内部数据管理

sen*_*iwu 3 apache-kafka apache-kafka-streams

在我的公司,我们广泛使用Kafka,但出于容错原因,我们一直使用关系数据库来存储多个中间转换和聚合的结果.现在我们正在探索Kafka Streams,这是一种更自然的方式.通常,我们的需求很简单 - 就是这样的一个例子

  • 听取输入队列 <K1,V1>, <K2,V2>, <K1,V2>, <K1,V3>...
  • 对于每条记录,执行一些高延迟操作(调用远程服务)
  • 如果在时间<K1,V1>被处理,并且两者<K1,V2>, <K1,V3>都已经生成,那么我应该处理V3,因为V2已经变得陈旧

为了达到这个目的,我正在阅读这个主题KTable.代码如下所示

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> kTable = builder.table("input-topic");
kTable.toStream().foreach((K,V) -> client.post(V));
return builder;
Run Code Online (Sandbox Code Playgroud)

这可以按预期工作,但我不清楚Kafka如何自动实现这一目标.我假设Kafka创建了内部主题来实现这一目标,但我没有看到任何内部主题被创建.该方法的Javadoc似乎证实了这一观察结果.但后来我遇到了这个官方页面,似乎表明Kafka使用了一个单独的数据存储区,即RocksDB以及一个更改日志主题.

现在我很困惑,因为在哪种情况下创建了changelog主题.我的问题是

  1. 如果官方页面建议的状态存储的默认行为是容错的,那么该状态存储在哪里?在RocksDB?在更改日志主题或两者?
  2. 在生产中依赖RocksDB有什么影响?(编者)
    1. 据我所知,对rocksdb的依赖是透明的(只是一个jar文件)而rocksdb将数据存储在本地文件系统中.但这也意味着在我们的情况下,该应用程序将在运行应用程序的存储上维护分片数据的副本.当我们用KTable替换远程数据库时,它具有存储意义,这是我的观点.
    2. Kafka发布会注意RocksDB将继续在各种平台上运行吗?(因为它似乎是依赖于平台的,而不是用Java编写的)
  3. 使输入主题日志压缩是否有意义?

我使用的是0.11.0

Mat*_*Sax 5

  1. Kafka Streams在当地存储州.默认情况下使用RocksDB.然而,当地的国家是短暂的.对于容错,对商店的所有更新也会写入changelog主题.这允许在发生故障或扩展/缩小的情况下重建和/或迁移存储.对于您的特殊情况,不会创建任何更改日志主题,因为KTable它不是聚合的结果,而是直接从主题填充 - 这只是一个优化.由于changelog主题将包含与输入主题完全相同的数据,因此Kafka Streams可避免数据重复,并在出现错误情况时将输入主题用作changelog主题.

  2. 不确定这个问题究竟是什么意思.请注意,RocksDB被认为是一个短暂的商店.默认情况下,它出于各种原因使用,如下所述:为什么Apache Kafka Streams使用RocksDB,以及如何更改它?(例如,它允许保持大于主内存的状态,因为它可以溢出到磁盘).您可以将RocksDB替换为任何其他商店.Kafka Streams还附带一个内存商店.(编辑)

    1. 那是对的.您需要相应地配置应用程序,以便能够存储整个状态的本地分片.有一个大小调整指南:https://docs.confluent.io/current/streams/sizing.html

    2. RocksDB是用C++编写的,并通过JNI绑定包含在内.Windows上存在一些已知问题,因为RocksDB不为所有版本的Windows提供预编译的二进制文件.只要你继续使用基于Linux的平台,就可以了.Kafka社区为RocksDB运行升级测试,以确保它兼容.

  3. 是.Kafka Streams实际上假定table()操作的输入主题是日志压缩的.否则,在发生故障时存在数据丢失的风险.(编辑)

    1. 如果启用日志压缩,则会忽略保留时间设置.因此,是的,最新的更新将永远保持(或直到null写入带有value =的逻辑删除消息).注意,当在代理端执行压缩时,旧数据被垃圾收集,因此,在恢复时,只读取每个密钥的新版本 - 在压缩过程中删除旧版本.如果您在一段时间后对某些数据不感兴趣,则需要在源主题中编写一个逻辑删除以使其工作.