MrP*_*ver 5 java spring apache-kafka
我正在尝试在阅读主题后立即提交一条消息。我按照这个链接(https://www.confluence.io/blog/apache-kafka-spring-boot-application)使用spring创建了一个Kafka消费者。通常情况下,它工作得很好,消费者收到消息并等待,直到另一个人进入队列。但问题是,当我处理这些消息时,需要花费很多时间(大约10分钟),kafka队列认为该消息没有被消费(提交),并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续更长的时间时,它不会提交消息。
我已经寻找了一些答案,但它对我没有帮助,因为我没有使用相同的源代码(当然还有不同的结构)。我尝试发送异步方法并异步提交消息,但失败了。一些来源是:
Spring Boot Kafka:由于组已经重新平衡,因此无法完成提交
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
https://github.com/confluenceinc/confluence-kafka-dotnet/issues/470
主要课程在这里:
@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApp .class, args);
}
Run Code Online (Sandbox Code Playgroud)
消费者类(我需要在其中提交消息)
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
Properties props=prope.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
Run Code Online (Sandbox Code Playgroud)
如何在从队列中读取消息后立即提交该消息。
我想确保当我收到消息时我会立即提交该消息。现在,当我完成 (System.out.println) 之后的方法的执行时,消息就会被提交。那么有人能告诉我该怎么做吗?
- - - 更新 - - - -
抱歉回复晚了,但正如 @GirishB 所建议的那样,我一直在查看 GirishB 的配置,但我不知道在哪里可以定义我想要从配置文件(applications.yml)中读取/侦听的主题。我看到的所有示例都使用与此类似的结构(http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html)。是否有任何选项可以读取其他服务器中声明的主题?使用类似于@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
=========== 解决方案 1 ======================================= ===
我遵循@victor gallet的建议,并包含了消费者属性的声明,以便在消费方法中容纳“Acknowledgment”对象。我还关注了此链接(https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main /java/org/s1p/CommonConfiguration.java)获取我用来声明和设置所有属性的所有方法(consumerProperties、consumerFactory、kafkaListenerContainerFactory)。我发现的唯一问题是“new SeekToCurrentErrorHandler()”声明,因为我收到一个错误,目前我无法解决它(如果有人向我解释的话那就太好了)。
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
//factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props = new HashMap<>();
Properties propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
//props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
//props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer"));
return props;
}
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
acknowledgment.acknowledge();// commit immediately
Properties props=prop.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
try {
CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method
/*This works fine when the processLaunch method takes less than 5 minutes,
if it takes longer the consumer will get the same message from the topic and start again with this operation
*/
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End of consumer method ");
}
}
``````````````````````````````````````````````````````````
Run Code Online (Sandbox Code Playgroud)
您必须修改您的消费者配置并将属性enable.auto.commit设置为 false :
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Run Code Online (Sandbox Code Playgroud)
然后,您必须修改 Spring Kafka Listener 工厂并将 ack-mode 设置为MANUAL_IMMEDIATE。这是一个示例ConcurrentKafkaListenerContainerFactory:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
Run Code Online (Sandbox Code Playgroud)
正如文档中所解释的,MANUAL_IMMEDIATE意味着:当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。
您可以在这里找到所有提交方法。
然后,在侦听器代码中,您可以通过添加对象来手动提交偏移量Acknowledgment,例如:
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
// commit immediately
acknowledgment.acknowledge();
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5833 次 |
| 最近记录: |