Eri*_*oda 4 node.js apache-kafka kafka-consumer-api
我正在使用kafka-node(kafka的节点客户端),使用使用者来检索有关主题的消息.不幸的是,我收到了"offsetOutOfRange"条件(调用了offsetOutOfRange回调).我的应用程序运行良好,直到消费者显着落后于生产者,在最早和最新的偏移之间留下了一个很大的差距.此时我(也许是错误的)假设消费者能够继续接收消息(并希望能够接收到生产者).
我的kafka消费者客户端代码如下:
:
:
var kafka = require('kafka-node');
var zookeeper = "10.0.1.201:2181";
var id = "embClient";
var Consumer = kafka.Consumer;
var client = new kafka.Client(zookeeper, id);
var consumer = new Consumer( client, [ { topic: "test", partition: 0 } ], { autoCommit: false } );
consumer.on('error', [error callback...]);
consumer.on('offsetOutOfRange', [offset error callback...]);
consumer.on('message', [message callback...]);
:
:
Run Code Online (Sandbox Code Playgroud)
我做错了什么,或者我错过了什么?
如果没有,我有几个问题:
(a)是否有一种公认的"最佳"方式来写客户端以优雅地处理这种情况?
(b)为何会提出这个条件?(我假设一个客户应该能够继续阅读它停止的消息,最终(理想情况下)赶上......)
(c)我是否需要编写代码/逻辑来处理这种情况,并明确地重新定位消费者偏移量以进行读取?(这看起来有点麻烦)......
任何帮助表示赞赏.
我相信该应用程序可能会尝试读取Kafka中不再提供的消息.Kafka根据log.retention.*属性删除旧消息.假设您已向Kafka发送1000条消息.由于保留,Kafka删除了前500条消息.如果您的应用尝试阅读消息350,它将失败并且它将引发offsetOutOfRange错误.之所以会发生这种情况,是因为您的消费者非常缓慢,以至于Kafka经纪人已经在消费者处理消息之前删除了消息.或者您的消费者崩溃了,但上次处理的消息的偏移量已保存在某处.
您可以使用Offset类检索最新/最早的可用偏移量(请参阅方法fetch)并更新消费者的偏移量.我们使用这种方法.
In general it's not easy to tell what should the app do when this situation occur because clearly something is very wrong.
Hope it helps, Lukáš
| 归档时间: |
|
| 查看次数: |
3928 次 |
| 最近记录: |