hel*_*eak 6 python kafka-python confluent-platform
我对 confluence_kafka 还很陌生,但我已经获得了一些使用 kafka-python 的经验。我想做的是改变开始消费消息的偏移量。这就是为什么我想构建一个能够返回到以前的消息的消费者客户端,以便返回将填充仪表板的数据。说使用kafka-python包我可以使用seek_to_end (https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L788)方法来获取位置值最新的提交。这样我就可以使用该seek方法减去值并返回到之前的消息(https://github.com/dpkp/kafka-python/blob/c0fddbd24269d4333e3b6630a23e86ffe33dfcb6/kafka/consumer/group.py#L738)
另一方面,conflient_kafka似乎没有类似的功能,到目前为止我发现的是使用变量OFFSET_END,其值为-1,并且它不会返回最新和最大的偏移数值一。我也可以使用“seek”函数,但我需要一种方法来获取最新偏移量的数值,而不是-1.
我的 avro 消费者看起来像
from confluent_kafka.avro import AvroConsumer
if __name__ == '__main__':
c = AvroConsumer({"bootstrap.servers": "locahost:29092", "group.id":"mygroup",'schema.registry.url': 'http://localhost:8081',
'enable.auto.commit': True,'default.topic.config': {'auto.offset.reset': 'smallest'}})
def my_assign (consumer, partitions):
for p in partitions:
p.offset = confluent_kafka.OFFSET_END
print("offset=",p.offset)
print('assign', partitions)
print('position:',consumer.position(partitions))
consumer.assign(partitions)
c.subscribe(["mytopic"],on_assign=my_assign)
while True:
m = c.poll(1)
if m is None:
continue
if m.error() is None:
print('Received message', m.value(),m.offset())
c.close()
Run Code Online (Sandbox Code Playgroud)
产生以下结果:
offset= -1
assign [TopicPartition{topic=mytopic,partition=0,offset=-1,error=None}]
position: [TopicPartition{topic=mytopic,partition=0,offset=-1001,error=None}]
Run Code Online (Sandbox Code Playgroud)
并等待下一条消息。我想知道是否有人可以帮助我。谢谢
您可以使用Consumer.get_watermark_offsets(参见文档)
例子:
cfg = {
# ... ...
"group.id": str(uuid4())
}
consumer = AvroConsumer(cfg)
topic_partition = TopicPartition("topic-name", partition=123)
low, high = consumer.get_watermark_offsets(topic_partition)
print("the latest offset is {}".format(high))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7958 次 |
| 最近记录: |