如何在使用消息时访问Kafka标头?

iam*_*ddy 6 java spring spring-integration apache-kafka

以下是我的配置

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
            kafka-consumer-context-ref="consumerContext"
            auto-startup="true"
            channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>
Run Code Online (Sandbox Code Playgroud)

inputFromKafka 在下面进行转换

public Message<?> transform(final Message<?> message) {

System.out.println( "KAFKA Message Headers " + message.getHeaders());

final Map<String, Map<Integer, List<Object>>> origData =  (Map<String, Map<Integer, List<Object>>>) message.getPayload();

        // some code to figure-out the nonPartitionedData
        return MessageBuilder.withPayload(nonPartitionedData).build();
    }
Run Code Online (Sandbox Code Playgroud)

无论如何,上面的print语句只打印两个一致的标题

KAFKA Message Headers {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}

在发送Kafka消息时,一些标题被传递,KafkaHeaders.MESSAGE_KEY但我也没有回复,想知道​​是否有实现这个目标?

Art*_*lan 5

不幸的是,它不起作用......

Producer部分(KafkaProducerMessageHandler)看起来是这样的:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
Run Code Online (Sandbox Code Playgroud)

如你所见,我们不向messageHeadersKafka 发送任何信息topic.仅payloadmessageKeyKafka协议指定的情况下完全正确.

从另一边,Consumerside(KafkaHighLevelConsumerMessageSource)执行以下逻辑:

if (!payloadMap.containsKey(messageAndMetadata.partition())) {
    final List<Object> payload = new ArrayList<Object>();
    payload.add(messageAndMetadata.message());
    payloadMap.put(messageAndMetadata.partition(), payload);
}
Run Code Online (Sandbox Code Playgroud)

如你所见,我们在这里并不关心messageKey.

KafkaMessageDrivenChannelAdapter(<int-kafka:message-driven-channel-adapter>)是给你的!它在将消息发送到频道之前执行此操作:

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());

if (!this.autoCommitOffset) {
    rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}
Run Code Online (Sandbox Code Playgroud)

  • `...传递了一些标题包括......`Kafka没有标题的概念; 如果要传输自定义信息,则必须在有效负载中. (4认同)