zer*_*d0l 4 spring spring-integration apache-kafka
我正在使用Kafka Spring Integration来使用kafka发布和使用消息.我看到Payload正确地从生产者传递给消费者,但是头部信息在某处被覆盖.
@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
ExecutionException {
try {
System.out.println("Headers :" + message.getHeaders().toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
Run Code Online (Sandbox Code Playgroud)
我得到以下标题:
Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}
Run Code Online (Sandbox Code Playgroud)
我正在生成器端构造消息,如下所示:
Message<FeelDBMessage> message = MessageBuilder
.withPayload(samplePayloadObj)
.setHeader(KafkaHeaders.MESSAGE_KEY, "key")
.setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();
// publish the message
publisher.publishMessage(message);
Run Code Online (Sandbox Code Playgroud)
以下是生产者的标题信息:
headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}
Run Code Online (Sandbox Code Playgroud)
知道为什么标题会被不同的价值所覆盖吗?
只是因为默认情况下Framework使用了不可变的 GenericMessage
.
对现有消息的任何操作(例如MessageBuilder.withPayload
)将产生新GenericMessage
实例.
从另一方面来看,Kafka不支持任何headers
抽象,如JMS或AMQP.这就是为什么KafkaProducerMessageHandler
当它向Kafka发布消息时这样做:
this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
Run Code Online (Sandbox Code Playgroud)
如你所见它根本不发送headers
.因此,其他方(用户)只是只处理message
从主题作为payload
和一些系统选项headers
一样topic
,partition
,messageKey
.
用两个词来说:我们不会在Kafka上传输标题,因为它不支持它们.
归档时间: |
|
查看次数: |
3143 次 |
最近记录: |