kafka-node 几个消费者

pak*_*aka 2 node.js apache-kafka

有一个基本示例,它对 1 个消费者来说就像一个魅力。它接收消息。但是添加一个额外的消费者将被忽略。

let kafka = require('kafka-node');
let client = new kafka.Client();

let producer = new kafka.Producer(client);

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]);
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}]);


producer.on('ready', function () {

    producer.send([{topic:'topic1', messages: 'topic 1 msg' ], (err,data)=>{
        console.log(err,'1 sent');
    });
    producer.send([{topic:'topic2', messages: 'topic 1 msg'}], (err,data)=>{
        console.log(err, '2 sent');
    });

});
producer.on('error', function (err) {
    console.log('err', err);
})



consumer1.on('message',(message) =>{
    console.log(11, message);
});
consumer2.on('message',(message) =>{
    console.log(22, message);
})
Run Code Online (Sandbox Code Playgroud)

消费者2的'22'事件永远不会触发的问题。如果我使用命令行工具检查该主题的数据存在

str*_*tle 5

您忘记在消费者参数中添加消费者组。在您的情况下,两个消费者都属于一个消费者组(默认情况下称为kafka-node-group)。拥有一个消费者组意味着每条消息将在每个消费者组中传递一次。而且由于您的主题有 0 个分区,因此只有第一个使用者会处理消息。

如果你想向两个消费者发送相同的消息,你必须有两个消费者组(每个组一个消费者)。您可以通过以下方式进行

let consumer1 =  new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}], {groupId: 'group1'});
let consumer2 =  new kafka.Consumer(client,[ {topic: 'topic2', partition: 0}], {groupId: 'group2'});
Run Code Online (Sandbox Code Playgroud)

如果您希望您的消费者一起(按分区)处理消息,您需要增加主题中的分区数。