Sak*_*ket 21 python apache-kafka kafka-consumer-api kafka-python
我正在为Kafka使用Python高级消费者,并希望了解主题的每个分区的最新偏移量.但是我无法让它发挥作用.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
Run Code Online (Sandbox Code Playgroud)
但我得到的输出是
For partition 0 highwater is None
For partition 1 highwater is None
For partition 2 highwater is None
For partition 3 highwater is None
For partition 4 highwater is None
For partition 5 highwater is None
....
For partition 96 highwater is None
For partition 97 highwater is None
For partition 98 highwater is None
For partition 99 highwater is None
Subscription = None
con.seek_to_beginning() = None
con.seek_to_end() = None
Run Code Online (Sandbox Code Playgroud)
我有一种替代方法,assign但结果是一样的
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.assign(ps)
for p in ps:
print "For partition %s highwater is %s"%(p.partition,con.highwater(p))
print "Subscription = %s"%con.subscription()
print "con.seek_to_beginning() = %s"%con.seek_to_beginning()
print "con.seek_to_end() = %s"%con.seek_to_end()
Run Code Online (Sandbox Code Playgroud)
从某些文档中可以看出,如果fetch尚未发布,我可能会遇到此行为.但是我找不到强迫它的方法.我究竟做错了什么?
或者是否有不同/更简单的方法来获取主题的最新偏移量?
Sak*_*ket 31
最后花了一天时间和几次错误的开始,我找到了一个解决方案,让它工作.发布给她以便其他人可以参考它.
from kafka import SimpleClient
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.common import OffsetRequestPayload
client = SimpleClient(brokers)
partitions = client.topic_partitions[topic]
offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
offsets_responses = client.send_offset_request(offset_requests)
for r in offsets_responses:
print "partition = %s, offset = %s"%(r.partition, r.offsets[0])
Run Code Online (Sandbox Code Playgroud)
avr*_*avr 17
如果您希望使用kafka/bin中的Kafka shell脚本,则可以使用kafka-run-class.sh获取最新和最小的偏移量.
获取最新的偏移命令将如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -1 --topic topiname
Run Code Online (Sandbox Code Playgroud)
获得最小偏移量命令将如下所示
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --time -2 --topic topiname
Run Code Online (Sandbox Code Playgroud)
您可以从以下链接中找到有关Get Offsets Shell的更多信息
希望这可以帮助!
Ita*_*der 14
from kafka import KafkaConsumer, TopicPartition
TOPIC = 'MYTOPIC'
GROUP = 'MYGROUP'
BOOTSTRAP_SERVERS = ['kafka01:9092', 'kafka02:9092']
consumer = KafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(TOPIC):
tp = TopicPartition(TOPIC, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (TOPIC, p, committed, last_offset, (last_offset - committed)))
consumer.close(autocommit=False)
Run Code Online (Sandbox Code Playgroud)
With kafka-python>=1.3.4 you can use:
kafka.KafkaConsumer.end_offsets(partitions)
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
from kafka import TopicPartition
from kafka.consumer import KafkaConsumer
con = KafkaConsumer(bootstrap_servers = brokers)
ps = [TopicPartition(topic, p) for p in con.partitions_for_topic(topic)]
con.end_offsets(ps)
Run Code Online (Sandbox Code Playgroud)