如何在 Python 中以编程方式获取每个 Kafka 主题分区的最新偏移量

Che*_*dva 4 python apache-kafka kafka-topic

我是 Kafka 新手,想要获取每个分区的 Kafka 主题的位置。我在文档中看到 - https://kafka- python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafkaadminclient - 偏移量可以通过函数获得KafkaAdminClient.list_consumer_group_offsets,但我没有看到这样的方法那里的位置。

有人知道我怎样才能得到它吗?

Gio*_*ous 5

使用confluent-kafka-python

您可以使用position

检索分区列表的当前位置(偏移量)。

from confluent_kafka import Consumer, TopicPartition


consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

offset_per_partition = consumer.position(partitions)
Run Code Online (Sandbox Code Playgroud)

或者,您也可以使用get_watermark_offsets,但必须一次传递一个分区,因此需要多次调用:

检索分区的低偏移量和高偏移量。

from confluent_kafka import Consumer, TopicPartition


consumer = Consumer({"bootstrap.servers": "localhost:9092"})
topic = consumer.list_topics(topic='topicName')
partitions = [TopicPartition('topicName', partition) for partition in list(topic.topics['topicName'].partitions.keys())] 

for p in partitions:
    low_offset, high_offset = consumer.get_watermark_offsets(p)
    print(f"Latest offset for partition {f}: {high_offset}")
Run Code Online (Sandbox Code Playgroud)

使用kafka-python

您可以使用end_offsets

获取给定分区的最后一个偏移量。分区的最后一个偏移量是即将到来的消息的偏移量,即最后一个可用消息的偏移量+1。

此方法不会更改分区的当前消费者位置。

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer


consumer = KafkaConsumer(bootstrap_servers = "localhost:9092" )
partitions= = [TopicPartition('myTopic', p) for p in consumer.partitions_for_topic('myTopic')]
last_offset_per_partition = consumer.end_offsets(partitions)
Run Code Online (Sandbox Code Playgroud)

如果您想遍历所有主题,可以使用以下方法:

from kafka import TopicPartition
from kafka.consumer import KafkaConsumer


kafka_topics = consumer.topics()
for topic in kafka_topics: 
    partitions= = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
    last_offset_per_partition = consumer.end_offsets(partitions)
Run Code Online (Sandbox Code Playgroud)