Spring Cloud @StreamListener 条件已弃用,替代方案是什么

Mic*_*Mok 5 java spring-boot spring-cloud-stream apache-kafka-streams spring-cloud-stream-binder-kafka

我们有多个应用程序消费者监听同一个 kafka 主题,并且生产者在向主题发送消息时设置消息标头,以便特定实例可以评估标头并处理消息。例如

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
    log.info("sydney order request received message is {}",message.getName());
}
Run Code Online (Sandbox Code Playgroud)

在 Spring Cloud Stream 3.0.0 中,@StreamListener 已被弃用,我无法在 Function 中找到条件属性的等效项。

有什么建议吗?

小智 1

虽然我也无法找到功能方法的等效项,但我确实有一个建议。

注释@StreamListener条件并不能阻止应用程序在将消息传递给侦听器 ( ) 之前必须使用消息、读取其标头并过滤掉特定记录的事实fullfillOrder()。因此,可以安全地假设您正在消费触及主题的每条消息(通过 Spring Cloud 在后台为我们实现的事件接收器),但侦听器仅在 header == sydney 时执行。

如果有一种方法可以配置 Spring Cloud 使用的事件接收器(在命中侦听器之前丢弃消息),我建议对此进行研究。如果没有,将在进行任何处理之前过滤掉任何消息(非悉尼)。如果您熟悉 Spring Cloud 的功能方法,看起来会像这样:

@Bean
public Consumer<Message<TestObj>> fulfillOrder() {
    return msg -> {
        // to get header - msg.getHeaders().get(key, valueType);
        // filter out bad messages
    }
}
Run Code Online (Sandbox Code Playgroud)

或者

@Bean
public Consumer<ConsumerRecord<?, TestObj>> fulfillOrder() {
    return msg -> {
        // msg.headers().lastHeader("franchiseName").value() -> filter em out
    }
}
Run Code Online (Sandbox Code Playgroud)

其他:^我的代码假设您通过spring-cloud-stream-binder-kafka. 根据列出的标签,我会注意到 Spring Cloud Stream 有两个版本的 Kafka 绑定器 - 一种用于 kafka 客户端库,另一种用于 kafka 流库。

如果不考虑 Spring Cloud / Frameworks,kafka 流中的高级 DSL 不会让您访问标头,但低级处理器 API 可以。从该示例来看,您似乎正在利用客户端绑定程序而不是spring-cloud-stream-binder-kafka-streams/ kafka 流绑定程序。我还没有看到使用低级处理器 API 的 Spring Cloud Stream + Kafka Streams Binder 的实现,所以我无法判断这是否是目标。