Ita*_*ayB 3 node.js apache-kafka kafka-consumer-api
我正在使用kafka-node来使用来自特定Kafka主题的消息.当我重新启动我的节点服务器时,它会按预期启动我的消费者,但它的默认行为是从偏移量0开始消耗,而我的目标是仅接收新消息(也就是从当前偏移开始消耗).我没有找到从API文档中实现这一目标的方法.谁知道它是否受支持?
谢谢!
我在kafka-node github问题(链接)中问了这个问题并得到了答案.它现在可用(从v0.4.0开始).以下代码段对我有用:
consumerClient = new kafka.Client('localhost:2181');
/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);
offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
var latestOffset = data['myTopic']['0'][0];
console.log("Consumer current offset: " + latestOffset);
});
var consumer = new kafka.HighLevelConsumer(
consumerClient,
[
{ topic: 'myTopic', partition: 0, fromOffset: -1 }
],
{
autoCommit: false
}
);
Run Code Online (Sandbox Code Playgroud)
干杯!
| 归档时间: |
|
| 查看次数: |
6643 次 |
| 最近记录: |