NoN*_*ame 4 python apache-kafka kafka-consumer-api confluent-platform
我有一个有 40 个分区的主题。设置是这样的:
def on_assign (c,ps):
for p in ps:
p.offset=0
print ps
c.assign(ps)
conf = {'bootstrap.servers': 'localhost:9092'
'enable.auto.commit' : False,
'group.id' : 'confluent_consumer',
'default.topic.config': {'auto.offset.reset': 'earliest'}
}
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)
msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())
Run Code Online (Sandbox Code Playgroud)
我想从偏移量 0 读取主题的所有分区topic.source
。但我没有看到所有分区都会发生这种情况。对于某些分区,它从特定的偏移量读取,我假设这是提交的偏移量,group.id
每次更改也没有帮助。如何从头开始读取该主题的所有分区,而不考虑提交的偏移量?
我打印ps
出来on_assign()
,它为所有 40 个分区打印了这样的内容:
[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
Run Code Online (Sandbox Code Playgroud)
如果您使用设置为新值或使用未使用设置group.id
提交任何偏移量的组,则消费者将从分区的开头开始。auto.offset.reset
earliest
也就是说,开头可能不是偏移 0。根据代理的日志保留设置,Kafka 可以删除消息,因此分区中的第一个可用消息可能位于任何偏移处。
归档时间: |
|
查看次数: |
8460 次 |
最近记录: |