Nit*_*its 35 apache-kafka kafka-consumer-api
每次运行消费者jar时,请任何人都可以告诉我如何使用Kafka Consumer API从一开始就阅读消息.
Nau*_*lus 39
这适用于0.9.x消费者.基本上,在创建使用者时,您需要使用该属性为此使用者分配使用者组ID ConsumerConfig.GROUP_ID_CONFIG.每次启动使用者执行类似这样的操作时,随机生成使用者组ID properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());(属性是您将传递给构造函数的java.util.Properties的实例new KafkaConsumer(properties)).
随机生成客户端意味着新的使用者组在kafka中没有与其关联的任何偏移量.因此,在此之后我们要做的是为此方案设置策略.正如该auto.offset.reset物业的文件所说:
当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时(例如因为该数据已被删除)该怎么办:
- 最早:自动将偏移重置为最早的偏移量
- 最新:自动将偏移重置为最新偏移
- none:如果未找到先前的偏移量或消费者的组,则向消费者抛出异常
- 其他:向消费者抛出异常.
因此,从上面列出的选项中我们需要选择earliest策略,以便新的消费者组每次都从头开始.
你在java中的代码看起来像这样:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);
Run Code Online (Sandbox Code Playgroud)
您现在需要弄清楚的唯一事情是,当多个消费者属于同一个消费者群体但是分布式如何生成随机ID并在这些实例之间分配它们以便它们都属于同一个消费者群体.
希望能帮助到你!
ucs*_*nil 16
执行此操作的一个选项是每次启动时都有一个唯一的组ID,这意味着Kafka会从头开始向您发送主题中的消息.在为KafkaConsumer以下项设置属性时执行以下操作:
properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
Run Code Online (Sandbox Code Playgroud)
另一个选择是使用,consumer.seekToBeginning(consumer.assignment())但这不起作用,除非Kafka首先通过让消费者调用poll方法从您的消费者获得心跳.所以打电话poll(),然后做一个seekToBeginning()然后再打电话,poll()如果你想从一开始就记录所有的记录.这是一个小小的hackey,但这似乎是0.9版本中最可靠的方法.
// At this point, there is no heartbeat from consumer so seekToBeinning() wont work
// So call poll()
consumer.poll(0);
// Now there is heartbeat and consumer is "alive"
consumer.seekToBeginning(consumer.assignment());
// Now consume
ConsumerRecords<String, String> records = consumer.poll(0);
Run Code Online (Sandbox Code Playgroud)
小智 14
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Run Code Online (Sandbox Code Playgroud)
如果您只是避免保存任何偏移量,消费者将始终在开始时重置。
小智 6
一种可能的解决方案是在订阅一个或多个主题的同时使用ConsumerRebalanceListener的实现。当分配新分区或从使用者中删除新分区时,ConsumerRebalanceListener包含回调方法。下面的代码示例对此进行了说明:
public class SkillsConsumer {
private String topic;
private KafkaConsumer<String, String> consumer;
private static final int POLL_TIMEOUT = 5000;
public SkillsConsumer(String topic) {
this.topic = topic;
Properties properties = ConsumerUtil.getConsumerProperties();
properties.put("group.id", "consumer-skills");
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(Collections.singletonList(this.topic),
new PartitionOffsetAssignerListener(this.consumer));
}
}
public class PartitionOffsetAssignerListener implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
public PartitionOffsetAssignerListener(KafkaConsumer kafkaConsumer) {
this.consumer = kafkaConsumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//reading all partitions from the beginning
for(TopicPartition partition : partitions)
consumer.seekToBeginning(partition);
}
Run Code Online (Sandbox Code Playgroud)
}
现在,只要将分区分配给使用者,就会从头开始读取每个分区。
要重置使用者组,您可以删除Zookeeper组ID
import kafka.utils.ZkUtils;
ZkUtils.maybeDeletePath(<zkhost:zkport>, </consumers/group.id>);`
Run Code Online (Sandbox Code Playgroud)
props.put("auto.offset.reset", "smallest");在创建时使用高级消费者集ConsumerConfig
| 归档时间: |
|
| 查看次数: |
62513 次 |
| 最近记录: |