lli*_*rik 5 java apache-kafka kafka-consumer-api
我有min.insync.replicas = 2一个复制因子为 3 的 kafka 主题,一个生产者使用acks=all. 一段时间后(在 1 分钟内),使用 java kafka 客户端为此主题创建了所有发送到该主题的新消费者的消息。consumer.endOffsets()获取此主题的所有 kafka 分区的使用方法结束偏移量。同一方法的另一个调用consumer.endOffsets有时会为某些分区返回不同的结束偏移量。
在此设置中,创建消费者后没有新消息发送到 kafka 主题。
根据java文档endOffsets:
/**
* Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming
* message, i.e. the offset of the last available message + 1. If messages have never been written
* to the the partition, the offset returned will be 0.
*
* <p>
* This method does not change the current consumer position of the partitions.
* <p>
* When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO).
* This is the offset of the first message with an open transaction. The LSO moves forward as transactions
* are completed.
*
* @see #seekToEnd(Collection)
*
* @param partitions the partitions to get the end offsets.
* @return The end offsets for the given partitions.
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
* expiration of the configured {@code request.timeout.ms}
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
Run Code Online (Sandbox Code Playgroud)
endOffsets 返回由所有副本确认的最后稳定偏移量 (LSO)。
为什么有时(不是经常)结束偏移量在此方法的后续调用之间发生变化?endOffsets 最终一致是预期的行为吗?一个错误?
| 归档时间: |
|
| 查看次数: |
433 次 |
| 最近记录: |