Kafka Stream - 如果在一段时间内没有收到给定键的事件,如何发送警报

Aur*_*ien 5 apache-kafka-streams

如果在一段时间内未在给定键的主题中收到任何事件,我需要发送警报。使用 KafkaStream 解决此用例的最佳方法是什么?

我试过:

1) windowedBy与抑制运算符一起:

    stream
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMillis(1000)).grace(Duration.ZERO))
        .count()
        .suppress(Suppressed.untilWindowCloses(unbounded()))
        .filter((k, v) -> v == 0)
        .toStream()
        .map((windowId, count) -> KeyValue.pair(windowId.key(), AlarmEvent.builder().build()))
        .to(ALARMS, Produced.with(Serdes.String(), AlarmEvent.serde()));
Run Code Online (Sandbox Code Playgroud)

但似乎窗口不会关闭,直到过期后发生事件,因此在定义的超时之后无法发送警报。

2)使用带有标点符号的处理器API ,它似乎可以工作,但我只使用 TopologyTestDriveradvanceWallClockTime ()进行测试。不确定这个 advanceWallClockTime() 反映了实时提前,或者只会在事件接收时改变,从而回到 1) 中的问题。

3)如果标点符号有效,我想在 ValueTranformer 中使用它从 DSL 拓扑中受益。但是,我遇到了如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件?中描述的问题。。无法从标点符号实例向下游发送事件。

4)最后,我想到了为每个分区定期(例如每秒)注入一些虚拟事件,从而人为地强制内部时钟前进。这将使我能够使用干净简单的 DSL 窗口并抑制运算符。

Mat*_*Sax 1

2) 使用处理器 API 和标点符号,它似乎可以工作,但我只使用 TopologyTestDriver 和 advanceWallClockTime() 进行测试。不确定这个 advanceWallClockTime() 反映了实时提前,或者只会在事件接收时改变,从而回到 1) 中的问题。

这是正确的做法。顾名思义,标点符号可以根据挂钟时间(即系统时间)触发。TopologyTestDriver出于测试目的模拟挂钟时间,但KafkaStreams将使用系统时间。

3)如果标点符号有效,我想在 ValueTranformer 中使用它以从 DSL 拓扑中受益。但是,我遇到了如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件? 中描述的问题。无法从标点符号实例向下游发送事件。

你需要用transform()它来代替。forward()在 a 的标点符号中不允许通过发出数据ValueTransformer,因为您可以发出任何密钥,从而违反了未修改密钥的约定。

4)最后,我想到了为每个分区定期(例如每秒)注入一些虚拟事件,从而人为地强制内部时钟前进。这将使我能够使用干净简单的 DSL 窗口并抑制运算符。

那也应该有效。