kafka-node start从上一个偏移消耗

Ita*_*ayB 3 node.js apache-kafka kafka-consumer-api

我正在使用kafka-node来使用来自特定Kafka主题的消息.当我重新启动我的节点服务器时,它会按预期启动我的消费者,但它的默认行为是从偏移量0开始消耗,而我的目标是仅接收新消息(也就是从当前偏移开始消耗).我没有找到从API文档中实现这一目标的方法.谁知道它是否受支持?

谢谢!

Ita*_*ayB 6

我在kafka-node github问题(链接)中问了这个问题并得到了答案.它现在可用(从v0.4.0开始).以下代码段对我有用:

consumerClient = new kafka.Client('localhost:2181');

/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
        var latestOffset = data['myTopic']['0'][0];
        console.log("Consumer current offset: " + latestOffset);
});

var consumer = new kafka.HighLevelConsumer(
        consumerClient,
        [
            { topic: 'myTopic', partition: 0, fromOffset: -1 }
        ],
        {
            autoCommit: false
        }
);
Run Code Online (Sandbox Code Playgroud)

干杯!