卡夫卡春季整合:卡夫卡消费者不会出现

zer*_*d0l 4 spring spring-integration apache-kafka

我正在使用Kafka Spring Integration来使用kafka发布和使用消息.我看到Payload正确地从生产者传递给消费者,但是头部信息在某处被覆盖.

@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
        ExecutionException {
    try {
            System.out.println("Headers :" + message.getHeaders().toString());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
Run Code Online (Sandbox Code Playgroud)

我得到以下标题:

Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}
Run Code Online (Sandbox Code Playgroud)

我正在生成器端构造消息,如下所示:

Message<FeelDBMessage> message = MessageBuilder
                .withPayload(samplePayloadObj)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();

        // publish the message
        publisher.publishMessage(message);
Run Code Online (Sandbox Code Playgroud)

以下是生产者的标题信息:

 headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}
Run Code Online (Sandbox Code Playgroud)

知道为什么标题会被不同的价值所覆盖吗?

Art*_*lan 6

只是因为默认情况下Framework使用了不可变的 GenericMessage.

对现有消息的任何操作(例如MessageBuilder.withPayload)将产生新GenericMessage实例.

从另一方面来看,Kafka不支持任何headers抽象,如JMS或AMQP.这就是为什么KafkaProducerMessageHandler当它向Kafka发布消息时这样做:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
Run Code Online (Sandbox Code Playgroud)

如你所见它根本不发送headers.因此,其他方(用户)只是只处理message从主题作为payload和一些系统选项headers一样topic,partition,messageKey.

用两个词来说:我们不会在Kafka上传输标题,因为它不支持它们.