如何从一开始就使用Kafka Consumer API读取数据?

Nit*_*its 35 apache-kafka kafka-consumer-api

每次运行消费者jar时,请任何人都可以告诉我如何使用Kafka Consumer API从一开始就阅读消息.

Nau*_*lus 39

这适用于0.9.x消费者.基本上,在创建使用者时,您需要使用该属性为此使用者分配使用者组ID ConsumerConfig.GROUP_ID_CONFIG.每次启动使用者执行类似这样的操作时,随机生成使用者组ID properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());(属性是您将传递给构造函数的java.util.Properties的实例new KafkaConsumer(properties)).

随机生成客户端意味着新的使用者组在kafka中没有与其关联的任何偏移量.因此,在此之后我们要做的是为此方案设置策略.正如该auto.offset.reset物业的文件所说:

当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时(例如因为该数据已被删除)该怎么办:

  • 最早:自动将偏移重置为最早的偏移量
  • 最新:自动将偏移重置为最新偏移
  • none:如果未找到先前的偏移量或消费者的组,则向消费者抛出异常
  • 其他:向消费者抛出异常.

因此,从上面列出的选项中我们需要选择earliest策略,以便新的消费者组每次都从头开始.

你在java中的代码看起来像这样:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);
Run Code Online (Sandbox Code Playgroud)

您现在需要弄清楚的唯一事情是,当多个消费者属于同一个消费者群体但是分布式如何生成随机ID并在这些实例之间分配它们以便它们都属于同一个消费者群体.

希望能帮助到你!

  • 这不是一个好的解决方案.这样做会导致zookeeper数据堆积,不断创建和放弃新条目.最好删除下面KingJulien所指示的小组条目以及他发布的链接答案. (6认同)

ucs*_*nil 16

执行此操作的一个选项是每次启动时都有一个唯一的组ID,这意味着Kafka会从头开始向您发送主题中的消息.在为KafkaConsumer以下项设置属性时执行以下操作:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
Run Code Online (Sandbox Code Playgroud)

另一个选择是使用,consumer.seekToBeginning(consumer.assignment())但这不起作用,除非Kafka首先通过让消费者调用poll方法从您的消费者获得心跳.所以打电话poll(),然后做一个seekToBeginning()然后再打电话,poll()如果你想从一开始就记录所有的记录.这是一个小小的hackey,但这似乎是0.9版本中最可靠的方法.

// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
Run Code Online (Sandbox Code Playgroud)


小智 14

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Run Code Online (Sandbox Code Playgroud)

如果您只是避免保存任何偏移量,消费者将始终在开始时重置。


小智 6

一种可能的解决方案是在订阅一个或多个主题的同时使用ConsumerRebalanceListener的实现。当分配新分区或从使用者中删除新分区时,ConsumerRebalanceListener包含回调方法。下面的代码示例对此进行了说明:

public class SkillsConsumer {

private String topic;

private KafkaConsumer<String, String> consumer;

private static final int POLL_TIMEOUT = 5000;

public SkillsConsumer(String topic) {
    this.topic = topic;
    Properties properties = ConsumerUtil.getConsumerProperties();
    properties.put("group.id", "consumer-skills");
    this.consumer = new KafkaConsumer<>(properties);
    this.consumer.subscribe(Collections.singletonList(this.topic),
            new PartitionOffsetAssignerListener(this.consumer));
    }
}

public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {

private KafkaConsumer consumer;

public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
    this.consumer = kafkaConsumer;
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    //reading all partitions from the beginning
    for(TopicPartition partition : partitions)
        consumer.seekToBeginning(partition);
}
Run Code Online (Sandbox Code Playgroud)

}

现在,只要将分区分配给使用者,就会从头开始读取每个分区。


Kin*_*ien 5

1)/sf/answers/1195908101/

2)http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3CCAOG_4QYz2ynH45a8kXb8qw7xw4vDRRwNqMn5j9ERFxJ8RfKGCg@mail.gmail.com%3E

要重置使用者组,您可以删除Zookeeper组ID

 import kafka.utils.ZkUtils;
 ZkUtils.maybeDeletePath(<zkhost:zkport>, </consumers/group.id>);`
Run Code Online (Sandbox Code Playgroud)


use*_*864 2

props.put("auto.offset.reset", "smallest");在创建时使用高级消费者集ConsumerConfig

  • 这只能确保您第一次阅读时它会从头开始阅读。后续读取将完全忽略此设置并从最后一个偏移量读取。 (13认同)
  • 应该是 props.put("auto.offset.reset", "earliest"); (4认同)