Asi*_*bal 4 java multithreading distributed-computing apache-kafka kafka-consumer-api
我最近一直在与卡夫卡合作,对消费者群体下的消费者有点困惑.混淆的中心是将消费者实现为流程还是线程.对于这个问题,假设我正在使用高级消费者.
让我们考虑一下我尝试过的场景.在我的主题中有2个分区(为简单起见,我们假设复制因子只有1).我创建了一个消费者(ConsumerConnector)过程consumer1与组group1,然后创建尺寸2的主题计数地图,然后产生了2个消费者线程consumer1_thread1和consumer1_thread2该过程下.它看起来像consumer1_thread1正在消耗分区0并且consumer1_thread2正在消耗分区1.这种行为总是确定的吗?以下是代码段.Class TestConsumer是我的消费者线程类.
...
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(2));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(2);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new TestConsumer(stream, threadNumber));
threadNumber++;
}
...
Run Code Online (Sandbox Code Playgroud)
现在,让我们考虑另一个场景(我没有尝试但很好奇),我开始2个消费者进程consumer1,consumer2两个都有相同的组group1,每个都是一个单线程进程.现在我的问题是:
在这种情况下,两个独立的消费者流程(在同一群组下)如何与分区相关联?它与上述单进程多线程场景有何不同?
通常,消费者线程或进程如何映射/与主题中的分区相关?
Kafka文档确实说消费者组下的每个消费者将使用一个分区.但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者流程?
关于将消费者作为流程与线程实现,我在这里缺少任何微妙的东西吗?提前致谢.
使用者组可以运行多个使用者实例(具有相同的多个进程group-id).虽然消耗每个分区,但组中只有一个消费者实例正在使用它.
例如,如果您的主题包含2个分区,并且您启动了group-A具有2个使用者实例的使用者组,那么每个实例都会消耗来自该主题的特定分区的消息.
如果你开始相同的2消费者以不同的群ID group-A&group-B然后从话题的两个分区的信息将被广播到他们中的每一个.因此,在这种情况下,运行的消费者实例group-A将具有来自该主题的两个分区的消息,同样也是如此group-B.
编辑:根据你的评论说,
我想知道在同一个流程下拥有2个消费者线程与2个消费者流程之间的有效区别是什么(在两种情况下组都相同)
group-id整个群集中的消费者是相同/全局的.假设你已经启动了一个有2个线程的进程 - 然后生成另一个进程(可能在不同的机器中),同一个groupId有2个以上的线程,那么kafka将添加这2个新线程来使用来自该主题的消息.因此最终将有4个线程负责从同一主题消费.然后,Kafka将触发重新平衡以将分区重新分配给线程,因此可能会发生因为线程正在使用的特定分区T1 of process P1可能被线程占用T2 of process P2.以下几行来自维基页面
当使用相同的使用者组名称启动新进程时,Kafka会将该进程的线程添加到可用于使用Topic并触发"重新平衡"的线程集.在此重新平衡期间,Kafka会将可用分区分配给可用线程,可能会将分区移动到另一个进程.如果您混合使用新旧业务逻辑,则某些消息可能会转到旧逻辑.