cod*_*ent 4 apache-kafka apache-kafka-streams
我有以下拓扑:
但是我在 STATIONS_LOW_CAPACITY_TOPIC 上看到了这个:
? null
? null
? null
? {"id":140,"latitude":"40.4592351","longitude":"-3.6915330",...}
? {"id":137,"latitude":"40.4591366","longitude":"-3.6894151",...}
? null
Run Code Online (Sandbox Code Playgroud)
也就是说,就好像它也将那些没有通过过滤器的记录发布到 STATIONS_LOW_CAPACITY_TOPIC 主题。这怎么可能?我怎样才能防止它们被发布?
这是 ksteams 代码:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Station, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
Stats(XXX)
}
.toStream().to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
Run Code Online (Sandbox Code Playgroud)
更新:我已经简化了拓扑并打印了结果表。出于某种原因,最终的 KTable 还包含与未通过过滤器的上游记录相对应的空值记录:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, BiciMadStation, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.filter { _, value ->
val conditionResult = (SOME_CONDITION)
println(conditionResult)
conditionResult
}
.print()
Run Code Online (Sandbox Code Playgroud)
日志:
false
[KTABLE-FILTER-0000000002]: 1, (null<-null)
false
[KTABLE-FILTER-0000000002]: 2, (null<-null)
false
[KTABLE-FILTER-0000000002]: 3, (null<-null)
false
[KTABLE-FILTER-0000000002]: 4, (null<-null)
true
[KTABLE-FILTER-0000000002]: 5, (Station(id=5, latitude=40.4285524, longitude=-3.7025875, ...)<-null)
Run Code Online (Sandbox Code Playgroud)
答案在以下的 javadoc 中KTable.filter(...):
请注意,更改日志流的过滤器与记录流过滤器的工作方式不同,因为具有空值的记录(所谓的墓碑记录)具有删除语义。因此,对于墓碑,不评估提供的过滤谓词,但如果需要(即,如果有任何内容要删除)直接转发墓碑记录。此外,对于每个被丢弃的记录(即点不满足给定的谓词),都会转发一个墓碑记录。
这解释了为什么我看到向下游发送的空值(墓碑)记录。
为了避免它,我将 KTable 转换为 KStream,然后应用过滤器:
kStream.groupByKey().reduce({ _, newValue -> newValue },
Materialized.`as`<Int, Stations, KeyValueStore<Bytes, ByteArray>>(STATIONS_STORE)
.withKeySerde(Serdes.Integer())
.withValueSerde(stationSerde))
.toStream()
.filter { _, value -> SOME_CONDITION }
.mapValues { station ->
StationStats(station.id, station.latitude, station.longitude, ...)
}
.to(STATIONS_LOW_CAPACITY_TOPIC, Produced.with(Serdes.Integer(), stationStatsSerde))
Run Code Online (Sandbox Code Playgroud)
结果:
4 {"id":4,"latitude":"40.4302937","longitude":"-3.7069171",...}
5 {"id":5,"latitude":"40.4285524","longitude":"-3.7025875",...}
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
889 次 |
| 最近记录: |