如何使用Kafka Consumer API中的密钥读取数据?

Kha*_*han 6 apache-kafka kafka-consumer-api

我正在使用以下代码构建消息...

Producer<String, String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
KeyedMessage<String, String> keyedMsg = new KeyedMessage<String, String>(topic, "device-420",  "{message:'hello world'}");          
producer.send(keyedMsg);
Run Code Online (Sandbox Code Playgroud)

并使用以下代码块进行消费...

//Key = topic name, Value = No. of threads for topic
Map<String, Integer> topicCount = new HashMap<String, Integer>();       
topicCount.put(topic, 1);

//ConsumerConnector creates the message stream for each topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);         

// Get Kafka stream for topic
List<KafkaStream<byte[], byte[]>> kStreamList = consumerStreams.get(topic);

// Iterate stream using ConsumerIterator
for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
    ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();         
    while (consumerIte.hasNext()) {             
        MessageAndMetadata<byte[], byte[]> msg = consumerIte.next();            
        System.out.println(topic.toUpperCase() + ">"
                + " Partition:" + msg.partition()
                + " | Key:"+ new String(msg.key())
                + " | Offset:" + msg.offset()
                + " | Message:"+ new String(msg.message()));
    }
}
Run Code Online (Sandbox Code Playgroud)

一切正常,因为我正在阅读数据主题。所以我想知道在这个例子中,有没有办法使用消息密钥(device-420)来消费数据?

Mat*_*Sax 7

简短的回答:不。

Kafka中最小的粒度是分区。您可以编写一个仅从单个分区读取的客户端。但是,一个分区可以包含多个键,您需要消耗该分区中包含的所有键。