Nag*_*Rao 8 java concurrency multithreading apache-kafka
我正在为高容量高速分布式应用编写Kafka Consumer.我只有一个主题,但传入消息的速率非常高.拥有多个服务于更多消费者的分区将适用于此用例.最好的消费方式是拥有多个流阅读器.根据文档或可用样本,ConsumerConnector提供的KafkaStream数量基于主题数量.想知道如何获得多个KafkaStream读取器[基于分区],这样我可以跨每个流跨越一个线程或从多个线程中的相同KafkaStream读取将从多个分区进行并发读取?
任何见解都非常感谢.
Nag*_*Rao 15
想分享我在邮件列表中找到的内容:
您在主题图中传递的数字控制主题分为多少个流.在您的情况下,如果传入1,则所有10个分区的数据将被送入1个流.如果传入2,则2个流中的每一个都将从5个分区获取数据.如果你传入11,其中10个将从1个分区获得数据,1个流将不会获得任何数据.
通常,您需要在自己的线程中迭代每个流.这是因为如果没有新事件,每个流都可以永久阻止.
示例代码段:
topicCount.put(msgTopic, new Integer(partitionCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = connector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(msgTopic);
for (final KafkaStream stream : streams) {
ReadTask task = new ReadTask(stream, msgTopic);
task.addObserver(this.msgObserver);
tasks.add(task); executor.submit(task);
}
Run Code Online (Sandbox Code Playgroud)
参考:http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCA+sHyy_Z903dOmnjp7_yYR_aE2sRW-x7XpAnqkmWaP66GOqf6w@mail.gmail.com%3E
| 归档时间: |
|
| 查看次数: |
8783 次 |
| 最近记录: |