使用 Spring Kafka 添加自定义标头

Ann*_*nnu 4 spring apache-kafka spring-kafka spring-boot-test

我计划使用 Spring Kafka 客户端从 Spring Boot 应用程序中的 kafka 设置中使用和生成消息。我在 Kafka 0.11 中看到了对自定义标头的支持,详见此处。虽然它可用于本地 Kafka 生产者和消费者,但我看不到在 Spring Kafka 中添加/读取自定义标头的支持。

我正在尝试基于重试计数为消息实现 DLQ,我希望将其存储在消息标头中,而无需解析有效负载。

Pim*_*oek 11

当我偶然发现这个问题时,我正在寻找答案。但是我使用的是ProducerRecord<?, ?>类而不是Message<?>,因此标头映射器似乎不相关。

这是我添加自定义标头的方法:

var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);
Run Code Online (Sandbox Code Playgroud)

现在为了读取标头(在使用之前),我添加了一个自定义拦截器。

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
        Set<TopicPartition> partitions = records.partitions();
        partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));

        return records;
    }

    private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
        records.forEach(record -> {
            var myHeaders = new ArrayList<Header>();
            record.headers().headers("MyHeader").forEach(myHeaders::add);
            log.info("My Headers: {}", myHeaders);
            // Do with header as you see fit
        });
    }

    @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
    @Override public void close() {}
    @Override public void configure(Map<String, ?> configs) {}
}
Run Code Online (Sandbox Code Playgroud)

最后一点是使用以下(Spring Boot)配置向 Kafka Consumer Container 注册此拦截器:

import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@Configuration
public class MessagingConfiguration {

    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
        return new DefaultKafkaConsumerFactory<>(consumerProperties);
    }

}
Run Code Online (Sandbox Code Playgroud)


Art*_*lan 6

好吧,Spring Kafka 自2.0 版起提供标头支持:https : //docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers

You can have that KafkaHeaderMapper instance and use it to populated headers to the Message before sending it via KafkaTemplate.send(Message<?> message). Or you can use the plain KafkaTemplate.send(ProducerRecord<K, V> record).

When you receive records using KafkaMessageListenerContainer, the KafkaHeaderMapper can be supplied there via a MessagingMessageConverter injected to the RecordMessagingMessageListenerAdapter.

So, any custom headers can be transferred either way.