jag*_*lan 1 spring apache-kafka spring-cloud-stream
我正在使用 Spring Cloud 流来消费来自 Kafka 的消息。
是否可以从代码中读取 Kafka 消息密钥?
我有一个 Kafka 主题,它通常有两种类型的消息。要采取的操作因消息键而异。我看到 spring 文档只有以下内容来阅读消息。在这里,我需要指定消息的实际映射(此处为 Greetings 类)。但是,我需要一种方法来读取消息键并确定可反序列化的 Pojo
public class GreetingsListener {
@StreamListener(GreetingsProcessor.INPUT)
public void handleGreetings(@Payload Greetings request) {
}
}
Run Code Online (Sandbox Code Playgroud)
你可以尝试这样的事情:
@StreamListener(GreetingsProcessor.INPUT)
public void handleGreetings(@Payload Greetings request, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)String key) {
}
Run Code Online (Sandbox Code Playgroud)
您需要为密钥提供适当的解串器。例如,如果您的密钥是字符串,那么您可以提供:
spring.cloud.stream.kafka.binder.configuration.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Run Code Online (Sandbox Code Playgroud)
如果需要为不同的输入通道使用不同的密钥解串器,可以在每个 kafka 绑定的生产者部分下扩展此设置。例如:
spring:
cloud:
stream:
kafka:
bindings:
<channel_name>:
consumer:
startOffset: latest
autoCommitOffset: true
autoCommitOnError: true
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1604 次 |
最近记录: |