Akh*_*aby 13 spring apache-kafka spring-kafka
我正在使用 Spring-Kafka 编写我的第一个 Kafka Consumer。查看了框架提供的不同选项,并且对此几乎没有怀疑。有人可以在下面澄清,如果你已经工作过。
问题 - 1:根据 Spring-Kafka 文档,有两种方法可以实现 Kafka-Consumer;“您可以通过配置 MessageListenerContainer 并提供消息侦听器或使用 @KafkaListener 注释来接收消息”。有人能告诉我什么时候应该选择一个选项而不是另一个选项吗?
问题 - 2:我选择了 KafkaListener 方法来编写我的应用程序。为此,我需要初始化一个容器工厂实例,并且在容器工厂内部有控制并发的选项。只是想仔细检查我对并发的理解是否正确。
假设,我有一个主题名称 MyTopic,其中有 4 个分区。为了使用来自 MyTopic 的消息,我已经启动了我的应用程序的 2 个实例,这些实例是通过将并发设置为 2 来启动的。因此,理想情况下,根据 kafka 分配策略,2 个分区应分配给 consumer1,其他 2 个分区应分配给 consumer2 . 既然并发设置为2,那么每个consumer是否会启动2个线程,并行的从topic中消费数据?如果我们并行消费,我们还应该考虑什么。
问题 3 - 我选择了手动确认模式,而不是在外部管理偏移量(不将其持久化到任何数据库/文件系统)。那么我是否需要编写自定义代码来处理重新平衡,或者框架会自动管理它?我认为没有,因为我只有在处理完所有记录后才承认。
问题 - 4:另外,使用手动 ACK 模式,哪个监听器会提供更好的性能?BATCH 消息侦听器或普通消息侦听器。我想如果我使用普通消息侦听器,则在处理每条消息后将提交偏移量。
粘贴下面的代码供您参考。
批确认消费者:
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record : " + record.value());
// Process the message here..
listener.addOffset(record.topic(), record.partition(), record.offset());
}
acknowledgment.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)
初始化容器工厂:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configs;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
// Not sure about the impact of this property, so going with 1
factory.setConcurrency(2);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
@KafkaListener是一个消息驱动的“POJO”,它添加了有效载荷转换、参数匹配等内容。如果你实现了,MessageListener你只能ConsumerRecord从 Kafka获取原始数据。请参阅@KafkaListener 注释。
是的,并发代表线程数;每个线程创建一个Consumer; 它们并行运行;在您的示例中,每个将获得 2 个分区。
如果我们并行消费,我们还应该考虑什么。
您的侦听器必须是线程安全的(没有共享状态或任何此类状态需要受锁保护。
目前尚不清楚“处理重新平衡事件”是什么意思。当重新平衡发生时,框架将提交任何未决的偏移量。
这没什么区别;消息侦听器 Vs。批处理侦听器只是一种偏好。即使使用消息侦听器,使用 MANUAL ackmode,在处理完轮询的所有结果后,也会提交偏移量。在 MANUAL_IMMEDIATE 模式下,偏移量是一一提交的。
| 归档时间: |
|
| 查看次数: |
10614 次 |
| 最近记录: |