Asi*_*bal 151 java distributed-computing apache-kafka
我对卡夫卡比较新.我已经做了一些实验,但有一些事情我不清楚消费者抵消.从我到目前为止所理解的情况来看,当消费者开始时,它将开始读取的偏移量由配置设置决定auto.offset.reset(如果我错了,请纠正我).
现在说,例如主题中有10条消息(偏移0到9),并且消费者在它关闭之前(或者在我杀死消费者之前)恰好消耗了其中的5条消息.然后说我重启那个消费者流程.我的问题是:
如果auto.offset.reset设置为smallest,它是否总是从偏移量0开始消耗?
如果auto.offset.reset设置为largest,是否将从偏移量5开始消耗?
关于这种情况的行为总是确定的吗?
如果我的问题中的任何内容不清楚,请不要犹豫.提前致谢.
ser*_*jja 234
它比你描述的要复杂一点.在auto.offset.reset只有当您的消费群没有一个有效的抵消致力于某处配置踢(2支持偏移存储器现在卡夫卡和动物园管理员).它还取决于您使用的消费者类型.
如果您使用高级java使用者,那么请想象以下场景:
您的消费者群体中的消费者group1已经消耗了5条消息并且已经死亡.下次启动此消费者时,它甚至不会使用该auto.offset.reset配置,并将从它死亡的地方继续,因为它只会从偏移存储(Kafka或ZK,如我所提到的)获取存储的偏移量.
您在主题中有消息(如您所述),并在新的消费者组中启动消费者group2.在任何地方都没有存储偏移量,这次auto.offset.reset配置将决定是从topic(smallest)的开头还是从主题的结尾开始(largest)
影响什么偏移值将对应smallest和largest配置的另一件事是日志保留策略.想象一下,您的主题保留配置为1小时.您生成5条消息,然后一小时后再发布5条消息.该largest偏移将仍然相同,前面的例子,但smallest一个不能是0因为卡夫卡都已经删除这些信息,从而可用最小的偏移会5.
上面提到的所有内容都与之无关SimpleConsumer,每次运行时,都会决定从使用auto.offset.reset配置开始.
Isr*_*inc 78
只是一个更新:从Kafka 0.9开始,Kafka正在使用消费者的新Java版本,并且auto.offset.reset参数名称已更改; 从手册:
当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时(例如因为该数据已被删除)该怎么办:
最早:自动将偏移重置为最早的偏移量
最新:自动将偏移重置为最新偏移
none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常
其他:向消费者抛出异常.
在检查接受的答案后我花了一些时间才找到这个,所以我认为社区发布它可能是有用的.
更进一步的是offsets.retention.minutes。如果自上次提交以来的时间为> offsets.retention.minutes,则auto.offset.reset也开始执行
| 归档时间: |
|
| 查看次数: |
72519 次 |
| 最近记录: |