如何从 Spring Cloud 流中读取 Kafka 消息密钥?

jag*_*lan 1 spring apache-kafka spring-cloud-stream

我正在使用 Spring Cloud 流来消费来自 Kafka 的消息。

是否可以从代码中读取 Kafka 消息密钥?

我有一个 Kafka 主题,它通常有两种类型的消息。要采取的操作因消息键而异。我看到 spring 文档只有以下内容来阅读消息。在这里,我需要指定消息的实际映射(此处为 Greetings 类)。但是,我需要一种方法来读取消息键并确定可反序列化的 Pojo

public class GreetingsListener {

    @StreamListener(GreetingsProcessor.INPUT)
    public void handleGreetings(@Payload Greetings request) {
     
    }
}
Run Code Online (Sandbox Code Playgroud)

sob*_*cko 5

你可以尝试这样的事情:

@StreamListener(GreetingsProcessor.INPUT)
public void handleGreetings(@Payload Greetings request, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)String  key) {

}
Run Code Online (Sandbox Code Playgroud)

您需要为密钥提供适当的解串器。例如,如果您的密钥是字符串,那么您可以提供:

spring.cloud.stream.kafka.binder.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Run Code Online (Sandbox Code Playgroud)

如果需要为不同的输入通道使用不同的密钥解串器,可以在每个 kafka 绑定的生产者部分下扩展此设置。例如:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          <channel_name>:
            consumer:
              startOffset: latest
              autoCommitOffset: true
              autoCommitOnError: true
              configuration:
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Run Code Online (Sandbox Code Playgroud)