我正在尝试在阅读主题后立即提交一条消息。我按照这个链接(https://www.confluence.io/blog/apache-kafka-spring-boot-application)使用spring创建了一个Kafka消费者。通常情况下,它工作得很好,消费者收到消息并等待,直到另一个人进入队列。但问题是,当我处理这些消息时,需要花费很多时间(大约10分钟),kafka队列认为该消息没有被消费(提交),并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续更长的时间时,它不会提交消息。
我已经寻找了一些答案,但它对我没有帮助,因为我没有使用相同的源代码(当然还有不同的结构)。我尝试发送异步方法并异步提交消息,但失败了。一些来源是:
Spring Boot Kafka:由于组已经重新平衡,因此无法完成提交
https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o
https://github.com/confluenceinc/confluence-kafka-dotnet/issues/470
主要课程在这里:
@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApp .class, args);
}
Run Code Online (Sandbox Code Playgroud)
消费者类(我需要在其中提交消息)
@Service
public class Consumer {
@Autowired
AppPropert prop;
Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message) throws IOException {
/*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
Properties props=prope.startProp();//just getting my properties from my config-file
ControllerPRO pro = new ControllerPRO();
List<Future<String>> …
Run Code Online (Sandbox Code Playgroud)