我正在尝试使用Kafka Streams(即不是简单的 Kafka 消费者)从重试主题中读取以前无法处理的事件。我希望从重试主题中消费,如果处理仍然失败(例如,如果外部系统关闭),我希望将事件放回重试主题。因此,我不想立即继续消费,而是在消费前等待一段时间,以免系统被暂时无法处理的消息淹没。
简化后,代码目前执行此操作,我希望为其添加延迟。
fun createTopology(topic: String): Topology {
val streamsBuilder = StreamsBuilder()
streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
.peek { key, msg -> logger.info("Received event for key $key : $msg") }
.map { key, msg -> enrich(msg) }
.foreach { key, enrichedMsg -> archive(enrichedMsg) }
return streamsBuilder.build()
}
Run Code Online (Sandbox Code Playgroud)
我尝试使用 Window Delay 来设置它,但没有设法让它工作。我当然可以在 a 里面做一个 sleep peek
,但这会留下一个线程挂起并且听起来不是一个非常干净的解决方案。
延迟如何工作的确切细节对我的用例来说并不是非常重要。例如,所有这些都可以正常工作:
x
几秒内该主题上的所有事件都被一次性消耗掉。开始/完成消费后,流等待x
几秒钟再消费x
在放在主题上几秒钟后处理x
每个事件之间有几秒钟的延迟如果有人能提供几行 Kotlin 或 Java 代码来完成上述任何一项,我将不胜感激。