sjo*_*omj 5 java kotlin apache-kafka apache-kafka-streams
我正在尝试使用Kafka Streams(即不是简单的 Kafka 消费者)从重试主题中读取以前无法处理的事件。我希望从重试主题中消费,如果处理仍然失败(例如,如果外部系统关闭),我希望将事件放回重试主题。因此,我不想立即继续消费,而是在消费前等待一段时间,以免系统被暂时无法处理的消息淹没。
简化后,代码目前执行此操作,我希望为其添加延迟。
fun createTopology(topic: String): Topology {
val streamsBuilder = StreamsBuilder()
streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
.peek { key, msg -> logger.info("Received event for key $key : $msg") }
.map { key, msg -> enrich(msg) }
.foreach { key, enrichedMsg -> archive(enrichedMsg) }
return streamsBuilder.build()
}
Run Code Online (Sandbox Code Playgroud)
我尝试使用 Window Delay 来设置它,但没有设法让它工作。我当然可以在 a 里面做一个 sleep peek
,但这会留下一个线程挂起并且听起来不是一个非常干净的解决方案。
延迟如何工作的确切细节对我的用例来说并不是非常重要。例如,所有这些都可以正常工作:
x
几秒内该主题上的所有事件都被一次性消耗掉。开始/完成消费后,流等待x
几秒钟再消费x
在放在主题上几秒钟后处理x
每个事件之间有几秒钟的延迟如果有人能提供几行 Kotlin 或 Java 代码来完成上述任何一项,我将不胜感激。
您无法真正暂停使用 Kafka Streams 从输入主题读取数据 - “延迟”的唯一方法是调用“睡眠”,但正如您所提到的,这会阻塞整个线程并且不是一个好的解决方案。
但是,您可以做的是使用有状态处理器,例如process()
(带有附加状态存储)而不是foreach()
. 如果重试失败,您不会将记录放回到输入主题中,而是将其放入存储中,并注册一个具有所需重试延迟的标点符号。如果标点符号触发,您将重试,如果重试成功,您将从存储中删除该条目并取消标点符号;否则,您将等到标点符号再次触发。