小编sjo*_*omj的帖子

延迟 Kafka Streams 消费

我正在尝试使用Kafka Streams(即不是简单的 Kafka 消费者)从重试主题中读取以前无法处理的事件。我希望从重试主题中消费,如果处理仍然失败(例如,如果外部系统关闭),我希望将事件放回重试主题。因此,我不想立即继续消费,而是在消费前等待一段时间,以免系统被暂时无法处理的消息淹没。

简化后,代码目前执行此操作,我希望为其添加延迟。

fun createTopology(topic: String): Topology {
    val streamsBuilder = StreamsBuilder()

    streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
        .peek { key, msg -> logger.info("Received event for key $key : $msg") }
        .map { key, msg -> enrich(msg) }
        .foreach { key, enrichedMsg -> archive(enrichedMsg) }

    return streamsBuilder.build()
}
Run Code Online (Sandbox Code Playgroud)

我尝试使用 Window Delay 来设置它,但没有设法让它工作。我当然可以在 a 里面做一个 sleep peek,但这会留下一个线程挂起并且听起来不是一个非常干净的解决方案。

延迟如何工作的确切细节对我的用例来说并不是非常重要。例如,所有这些都可以正常工作:

  1. 过去x几秒内该主题上的所有事件都被一次性消耗掉。开始/完成消费后,流等待x几秒钟再消费
  2. 每个事件x在放在主题上几秒钟后处理
  3. 流消耗消息,x每个事件之间有几秒钟的延迟

如果有人能提供几行 Kotlin 或 Java 代码来完成上述任何一项,我将不胜感激。

java kotlin apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
2164
查看次数

标签 统计

apache-kafka ×1

apache-kafka-streams ×1

java ×1

kotlin ×1