mdm*_*mdm 2 scala apache-kafka apache-kafka-streams
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),所以我没有使用TopologyTestDriver.
我的拓扑具有键值类型的输入String -> Customer和String -> CustomerMapped. Serdes、模式以及与模式注册表的集成都按预期工作。
我正在使用 Scala、Kafka 2.2.0、Confluence Platform 5.2.1 和kafka-streams-scala. 我的拓扑尽可能简化,如下所示:
val otherBuilder = new StreamsBuilder()
otherBuilder
.table[String,Customer](source)
.mapValues(c => CustomerMapped(c.surname, c.age))
.toStream.to(target)
Run Code Online (Sandbox Code Playgroud)
(所有隐式 serdes、Produced、Consumed等都是默认的,并且可以正确找到)
我的测试包括同步且不间断地向主题发送一些记录 ( data) ,然后从主题读回,我将结果与:sourcetargetexpected
val data: Seq[(String, Customer)] = Vector(
"key1" -> Customer(0, "Obsolete", "To be overridden", 0),
"key1" -> Customer(0, "Obsolete2", "To be overridden2", 0),
"key1" -> Customer(1, "Billy", "The Man", 32),
"key2" -> Customer(2, "Tommy", "The Guy", 31),
"key3" -> Customer(3, "Jenny", "The Lady", 40)
)
val expected = Vector(
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
Run Code Online (Sandbox Code Playgroud)
我构建了 Kafka Stream 应用程序,在其他设置之间进行设置,以下两个:
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000")
val s: Long = 50L * 1024 * 1024
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, s.toString)
Run Code Online (Sandbox Code Playgroud)
因此,我希望 KTable 使用缓存,提交之间的间隔为 5 秒,缓存大小为 50MB(对于我的场景来说绰绰有余)。
我的问题是,我从主题读回的结果target始终包含key1. Obsolete我预计不会为带有“Obsolete1”的记录发出任何事件。实际输出是:
Vector(
"key1" -> CustomerMapped("To be overridden", 0),
"key1" -> CustomerMapped("To be overridden2", 0),
"key1" -> CustomerMapped("The Man", 32),
"key2" -> CustomerMapped("The Guy", 31),
"key3" -> CustomerMapped("The Lady", 40)
)
Run Code Online (Sandbox Code Playgroud)
最后要提的是:这个测试曾经按预期工作,直到我将 Kafka 从 2.1.0 更新到 2.2.0。我再次验证了我的应用程序降级。
我很困惑,谁能指出 2.2.x 版本中 KTables 的行为是否发生了变化?或者也许现在我必须设置新的设置来控制事件的发出?
在 Kafka 2.2 中,引入了优化来减少 Kafka Streams 的资源占用。KTable如果计算不需要A ,则不一定将其具体化。这适用于您的情况,因为mapValues()可以即时计算。因为KTable没有具体化,所以没有缓存,因此每个输入记录都会生成一个输出记录。
比较: https: //issues.apache.org/jira/browse/KAFKA-6036
如果你想强制KTable实现,你可以传入Materilized.as("someStoreName")方法StreamsBuilder#table()。