与 KafkaStreams 的窗口结束外连接

Nik*_*sov 4 outer-join apache-kafka apache-kafka-streams

我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。即"1-new", "1-old", "2-new", "2-old". 键是唯一的,但有些可能会丢失。

现在使用 Kotlin 和 KafkaStreams API,我可以记录那些具有相同密钥 ID 的新旧消息。

    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })

    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })

    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)

    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })
Run Code Online (Sandbox Code Playgroud)

现在我想知道由于某种原因在时间窗口中丢失的新/旧密钥。是否可以使用 KafkaStreams API 实现?

在我"1-old"收到密钥并且"1-new"不在 2 分钟内的情况下,仅在这种情况下我想将 id 报告1为可疑。

Mat*_*Sax 6

DSL 可能不会给你你想要的。但是,您可以使用处理器 API。话虽如此,leftJoin实际上可以用来做“举重”。因此,在leftJoin使用.transform(...)附加状态之后,您可以进一步“清理”数据。

对于old&null您收到的每张唱片,请将其放入商店。如果您稍后收到,old&new您可以将其从商店中删除。此外,您注册一个标点符号,并且在每次标点符号调用时,您都会扫描存储以查找“足够旧”的条目,因此您可以确保不会产生以后的old&new连接结果。对于这些条目,您old&null可以从存储中发出和删除它们。

作为替代方案,您也可以省略连接,并在单个transform()with 状态中执行所有操作。为此,您需要KStream#merge()新旧流并调用transform()合并的流。

注意:除了注册标点符号之外,您还可以将“扫描逻辑”放入转换并在每次处理记录时执行它。

  • 多个原因:(1)连接使用滑动窗口,而聚合使用跳跃窗口(所以它是不同的语义)(2)reduce() 也被连续评估,因此您可以解决某些问题并且需要使用抑制() plus计算实际连接的下游运算符。(3) 结合第 (2) 点,您需要将所有原始数据作为 `reduce` 的结果(即,使用 `aggregate` 并返回原始记录列表)。这会增加消息大小并且可能不适用于较大的窗口。 (2认同)