Nik*_*sov 4 outer-join apache-kafka apache-kafka-streams
我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。即"1-new"
, "1-old"
, "2-new"
, "2-old"
. 键是唯一的,但有些可能会丢失。
现在使用 Kotlin 和 KafkaStreams API,我可以记录那些具有相同密钥 ID 的新旧消息。
val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())
val newStream = stream.filter({ key, _ -> isNew(key) })
.map({key, value -> KeyValue(key.replace(NEW_PREFIX, ""), value) })
val oldStream = stream.filter({ key, _ -> isOld(key) })
.map({key, value -> KeyValue(key.replace(OLD_PREFIX, ""), value) })
val joined = newStream.join(oldStream,
{ value1, value2 -> "$value1&$value2" }, windows)
joined.foreach({ key, value ->
log.info { "JOINED $key : $value" }
})
Run Code Online (Sandbox Code Playgroud)
现在我想知道由于某种原因在时间窗口中丢失的新/旧密钥。是否可以使用 KafkaStreams API 实现?
在我"1-old"
收到密钥并且"1-new"
不在 2 分钟内的情况下,仅在这种情况下我想将 id 报告1
为可疑。
DSL 可能不会给你你想要的。但是,您可以使用处理器 API。话虽如此,leftJoin
实际上可以用来做“举重”。因此,在leftJoin
使用.transform(...)
附加状态之后,您可以进一步“清理”数据。
对于old&null
您收到的每张唱片,请将其放入商店。如果您稍后收到,old&new
您可以将其从商店中删除。此外,您注册一个标点符号,并且在每次标点符号调用时,您都会扫描存储以查找“足够旧”的条目,因此您可以确保不会产生以后的old&new
连接结果。对于这些条目,您old&null
可以从存储中发出和删除它们。
作为替代方案,您也可以省略连接,并在单个transform()
with 状态中执行所有操作。为此,您需要KStream#merge()
新旧流并调用transform()
合并的流。
注意:除了注册标点符号之外,您还可以将“扫描逻辑”放入转换并在每次处理记录时执行它。
归档时间: |
|
查看次数: |
1493 次 |
最近记录: |