Kafka 消费者 - 在反序列化之前读取消息头

Amj*_*der 2 java apache-kafka kafka-consumer-api

有没有办法在反序列化之前读取消息头?

我写了下面的代码,但我被迫在这里反序列化,有什么办法不反序列化吗?

    while (true) {

        ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            for (Header header : consumerRecord.headers()) {
                if (header.key().equals("my header")) {
                    String data = "\n New record received .. \n" +
                            " Value: " + consumerRecord.value() +
                            " Topic: " + consumerRecord.topic() +
                            " Header: " + header.key() +
                            " Partition: " + consumerRecord.partition();

                    logger.info(data);
                }
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

cri*_*007 5

您需要编写自己的反序列化器实现,以便能够在数据有效负载之前访问标头

或者...

任何不反序列化的方法

将反序列化器类设置为 ByteArrayDeserializer 而不是 StringDeserializer。

然后,手动进行反序列化,

for (ConsumerRecord<byte[], byte[]> consumerRecord : consumer.poll(Duration.ofSeconds(1))) {
    for (Header header : consumerRecord.headers()) {
       ...

    }
    String s = new String(consumerRecord.value(), "UTF-8"); // for example
}
Run Code Online (Sandbox Code Playgroud)