Confluence Kafka:消费者不会从头开始读取主题中的所有分区

NoN*_*ame 4 python apache-kafka kafka-consumer-api confluent-platform

我有一个有 40 个分区的主题。设置是这样的:

def on_assign (c,ps):
    for p in ps:
        p.offset=0
    print ps
    c.assign(ps)

conf = {'bootstrap.servers': 'localhost:9092'
        'enable.auto.commit' : False,
        'group.id' : 'confluent_consumer',
        'default.topic.config': {'auto.offset.reset': 'earliest'}
        }
consumer = Consumer(**conf)
consumer.subscribe(['topic.source'], on_assign=on_assign)

msg = consumer.poll(timeout=100000)
print "Topic is %s: | Partition is %d: | Offset is : %d | key is :%s " % (msg.topic(), msg.partition(), msg.offset(), msg.key())
Run Code Online (Sandbox Code Playgroud)

我想从偏移量 0 读取主题的所有分区topic.source。但我没有看到所有分区都会发生这种情况。对于某些分区,它从特定的偏移量读取,我假设这是提交的偏移量,group.id每次更改也没有帮助。如何从头开始读取该主题的所有分区,而不考虑提交的偏移量?

我打印ps出来on_assign(),它为所有 40 个分区打印了这样的内容:

[TopicPartition{topic=topic.source,partition=0,offset=0,error=None},TopicPartition{topic=topic.source,partition=1,offset=0,error=None}....] and so on
Run Code Online (Sandbox Code Playgroud)

Mic*_*son 5

如果您使用设置为新值或使用未使用设置group.id提交任何偏移量的组,则消费者将从分区的开头开始。auto.offset.resetearliest

也就是说,开头可能不是偏移 0。根据代理的日志保留设置,Kafka 可以删除消息,因此分区中的第一个可用消息可能位于任何偏移处。