相关疑难解决方法(0)

从主题读取后立即异步提交消息

我正在尝试在阅读主题后立即提交一条消息。我按照这个链接(https://www.confluence.io/blog/apache-kafka-spring-boot-application)使用spring创建了一个Kafka消费者。通常情况下,它工作得很好,消费者收到消息并等待,直到另一个人进入队列。但问题是,当我处理这些消息时,需要花费很多时间(大约10分钟),kafka队列认为该消息没有被消费(提交),并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续更长的时间时,它不会提交消息。

我已经寻找了一些答案,但它对我没有帮助,因为我没有使用相同的源代码(当然还有不同的结构)。我尝试发送异步方法并异步提交消息,但失败了。一些来源是:

Spring Boot Kafka:由于组已经重新平衡,因此无法完成提交

https://www.confluence.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

Kafka 0.10 Java消费者不从主题读取消息

https://github.com/confluenceinc/confluence-kafka-dotnet/issues/470

主要课程在这里:


@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }


Run Code Online (Sandbox Code Playgroud)

消费者类(我需要在其中提交消息)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka

5
推荐指数
1
解决办法
5833
查看次数

标签 统计

apache-kafka ×1

java ×1

spring ×1