在春季启动卡夫卡监听器中接收卡夫卡密钥

use*_*111 2 apache-kafka kafka-consumer-api spring-kafka

我是春季卡夫卡的新手。我有一个微服务,它使用 kafka 密钥发送消息,该密钥是用户定义的对象。

1) 第一个微服务使用 MyKey 对象实例的密钥向 Kafka 发送消息。

2)我需要做的是,聆听该主题并使用密钥获取此消息,并使用该密钥创建一个新密钥。

假设消息是由 myKey 的密钥发送的。我想在侦听器中做的是创建一个新的扩展键:

     @KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
     public void process( @Payload MyMessage myMessage){

        MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
        ....
        ....
        kafkaTemplate.send(TOPIC,  myExtendedKey, message);
      }
Run Code Online (Sandbox Code Playgroud)

我不知道如何获取在侦听器中发送的消息的密钥。

Fel*_*pia 18

使用 @KafkaListener 获取消息的键、值和元数据的最简单方法是在 KafkaListener 函数中使用 ConsumerRecord,而不是仅接收有效负载作为值记录。

@KafkaListener(topics = "any-topic")
void listener(ConsumerRecord<String, String> record) {
    System.out.println(record.key());
    System.out.println(record.value());
    System.out.println(record.partition());
    System.out.println(record.topic());
    System.out.println(record.offset());
}
Run Code Online (Sandbox Code Playgroud)

没有漂亮的注释,但它可以工作,此外,如果您想从 Kafka 主题接收记录,处理这些记录,然后将它们再次发送到另一个 Kafka 主题,我建议您看一下 Kafka Streams API。


Gar*_*ell 10

阅读文档

...

最后,有关消息的元数据可从消息头中获得。您可以使用以下标题名称来检索邮件的标题:

KafkaHeaders.RECEIVED_MESSAGE_KEY

KafkaHeaders.RECEIVED_TOPIC

KafkaHeaders.RECEIVED_PARTITION_ID

KafkaHeaders.RECEIVED_TIMESTAMP

KafkaHeaders.TIMESTAMP_TYPE

以下示例显示了如何使用标头:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
Run Code Online (Sandbox Code Playgroud)

偏移量也是可用的。