Ris*_*abh 5 java apache-kafka apache-storm
我有一个3节点kafka群集设置.我正在使用风暴来阅读来自kafka的消息.我系统中的每个主题都有7个分区.
现在我面临一个奇怪的问题.直到3天前,一切都运转良好.但是,现在看来我的风暴拓扑无法从2个分区 - #1和#4中专门读取.
我试着深入研究问题并发现在我的kafka日志中,对于这两个分区,缺少一个偏移,即在5964511之后,下一个偏移是5964513而不是5964512.
由于缺少偏移,Simple Consumer无法继续下一个偏移.我做错了什么或者它是一个已知的错误?
可能是这种行为的原因是什么?
我使用以下代码来读取有效偏移的窗口:
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets) {
System.out.println(validOffset + " : ");
}
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset );
return largestOffset;
}
Run Code Online (Sandbox Code Playgroud)
这给了我以下输出:
4529948 : 6000878
Run Code Online (Sandbox Code Playgroud)
因此,我提供的偏移量在偏移范围内.
抱歉回复晚了,但是...
我针对这种情况进行编码,方法是使用 Long 实例 var 来保存下一个要读取的偏移量,然后在获取后检查返回的 FetchResponse 是否 hasError()。如果出现错误,我会将下一个偏移值更改为合理的值(可能是下一个偏移或最后一个可用偏移),然后重试。
归档时间: |
|
查看次数: |
1231 次 |
最近记录: |