use*_*111 2 apache-kafka kafka-consumer-api spring-kafka
我是春季卡夫卡的新手。我有一个微服务,它使用 kafka 密钥发送消息,该密钥是用户定义的对象。
1) 第一个微服务使用 MyKey 对象实例的密钥向 Kafka 发送消息。
2)我需要做的是,聆听该主题并使用密钥获取此消息,并使用该密钥创建一个新密钥。
假设消息是由 myKey 的密钥发送的。我想在侦听器中做的是创建一个新的扩展键:
@KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
public void process( @Payload MyMessage myMessage){
MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
....
....
kafkaTemplate.send(TOPIC, myExtendedKey, message);
}
Run Code Online (Sandbox Code Playgroud)
我不知道如何获取在侦听器中发送的消息的密钥。
Fel*_*pia 18
使用 @KafkaListener 获取消息的键、值和元数据的最简单方法是在 KafkaListener 函数中使用 ConsumerRecord,而不是仅接收有效负载作为值记录。
@KafkaListener(topics = "any-topic")
void listener(ConsumerRecord<String, String> record) {
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.partition());
System.out.println(record.topic());
System.out.println(record.offset());
}
Run Code Online (Sandbox Code Playgroud)
没有漂亮的注释,但它可以工作,此外,如果您想从 Kafka 主题接收记录,处理这些记录,然后将它们再次发送到另一个 Kafka 主题,我建议您看一下 Kafka Streams API。
Gar*_*ell 10
请阅读文档。
...
最后,有关消息的元数据可从消息头中获得。您可以使用以下标题名称来检索邮件的标题:
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
以下示例显示了如何使用标头:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
Run Code Online (Sandbox Code Playgroud)
偏移量也是可用的。
归档时间: |
|
查看次数: |
7079 次 |
最近记录: |