Dhi*_*esh 7 c# apache-kafka kafka-consumer-api confluent-platform
我正在使用 Confluence kafka C# 客户端。如何获取此主题中消耗的最新偏移量?
除了之前的答案之外,您还可以使用
List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)
Run Code Online (Sandbox Code Playgroud)
它将返回给定主题/分区从 librdkafka 轮询的最后一个偏移量
Committed对于消费者最新提交的抵消,您有类似的方法
还可以查询最新的已知偏移量
WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
Run Code Online (Sandbox Code Playgroud)
它会向 kafka 集群发送请求。调用阻塞,设置适当的超时。目前,您无法同时向多个分区发送请求。您可以使用它来获取最后一个已知的偏移量,或者计算滞后
还有
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
Run Code Online (Sandbox Code Playgroud)
它将查询 librdkafka 中的内部状态,并可能返回 INVALID_OFFSET (-1001)。您可以使用它来检测由于处理数据而导致的一些滞后。(该方法的位置和结果之间的差异)
当您收到消息时,它应该包括主题、分区和消息来源的偏移量(除了键和值之外)。
从这里的例子:
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " +
$"Offset: {msg.Offset} {msg.Value}");
Run Code Online (Sandbox Code Playgroud)
当它到达每个主题分区的末尾时,您还会收到一个事件
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" +
$" , next message will be at offset {end.Offset}");
Run Code Online (Sandbox Code Playgroud)