Eug*_*sky 6 apache-kafka kafka-consumer-api
Kafka 对于分区返回 endOffset 15,但可以使用的最后一条消息的偏移量为 13,而不是我期望的 14。我想知道为什么。
卡夫卡文档阅读
在默认的read_uncommissed隔离级别下,结束偏移量是高水位线(即最后成功复制的消息的偏移量加一)。对于read_commited消费者来说,结束偏移量是最后一个稳定偏移量(LSO),它是高水位线的最小值和任何打开事务的最小偏移量。
这是 kafkacat 的输出。我使用 kafkacat,因为它可以打印消息偏移量:
$ kafkacat -Ce -p0 -tTK -f'offset: %o key: %k\n'
offset: 0 key: 0108
offset: 1 key: 0253
offset: 4 key: 0278
offset: 5 key: 0198
offset: 8 key: 0278
offset: 9 key: 0210
offset: 10 key: 0253
offset: 11 key: 1058
offset: 12 key: 0141
offset: 13 key: 1141
% Reached end of topic TK [0] at offset 15: exiting
Run Code Online (Sandbox Code Playgroud)
同样令人困惑的是——而且很可能是相关的——偏移量不是连续的,尽管我还没有设置压缩等。
更多细节:
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic TK --describe
Topic: TK PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: TK Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: TK Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Run Code Online (Sandbox Code Playgroud)
通过 kafka-console-consumer.sh 打印密钥:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TK \
--offset earliest --partition 0 --timeout-ms 5000 \
--property print.key=true --property print.value=false
0108
0253
0278
0198
0278
0210
0253
1058
0141
1141
[2021-09-15 10:54:06,556] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 10 messages
Run Code Online (Sandbox Code Playgroud)
注意:本主题的产生不涉及交易,并且 *)消费是在模式下完成的read_uncommitted
。
*) 实际上,processing.guarantee 设置为exactly_once_beta
,因此这相当于使用事务。
更多信息 事实 证明,我可以使用我的 Streams 应用程序可靠地重现这种情况(1. 擦除 kafka/zookeeper 数据,2. 重新创建主题,3. 运行应用程序),其输出是显示此问题的主题。同时,我已将 Streams 应用程序精简为这种无操作拓扑,并且仍然可以重现它:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [TK1])
--> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001 (topic: TK)
<-- KSTREAM-SOURCE-0000000000
Run Code Online (Sandbox Code Playgroud)
与此同时 ,我已将本地运行的 Kafka 代理 (2.5.0) 替换为在 Docker 容器中运行的代理 (wurstmeister/kafka:2.13-2.6.0)。问题仍然存在。
该应用程序使用版本为 6.0.1-ccs 的 kafka 库,对应于 2.6.0。
您应该避免对偏移量进行计算,Kafka 确保任何新的偏移量只会大于上一个偏移量。您可能希望使用密钥并通过验证已收到正确数量的密钥来跟踪您是否已收到适当数量的消息。
Kafka 有很多事情需要处理,例如 Exactly-Once 语义、重新发送消息以及与该主题相关的其他内部任务。这些消息将被丢弃(不会与您共享)。您只会看到您的消息,并且这些消息偏移量只会增加。
这些事务标记不会暴露给应用程序,但由处于 read_commissed 模式的消费者使用,以过滤掉已中止事务中的消息,并且不返回属于打开事务一部分的消息