Par*_*mar 6 producer-consumer node.js apache-kafka kafkajs
在我的一个用例中,包括使用数据、执行一些操作并将其生成到新主题。
我正在使用https://www.npmjs.com/package/kafkajs npm 库。
我想在成功操作后手动提交偏移量以避免任何数据丢失。我用来autoCommit: false
避免数据在使用后自动提交。
这是手动提交偏移量的代码
consumer.commitOffsets([
{ topic: 'topic-A', partition: 0, offset: '1' }
])
Run Code Online (Sandbox Code Playgroud)
正如我在某处读到的,如果我们有意提交每个偏移量(在消费后立即提交偏移量),那么它将在代理上创建负载,并且这样做不好。
我需要 kafka 专家的建议来针对我的上述用例提出最佳方法以避免任何数据丢失?请指教
为了手动处理提交,下面是代码。
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
...
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
},
});
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
4751 次 |
最近记录: |