小编MrP*_*ver的帖子

从主题读取后立即异步提交消息

我正在尝试在阅读主题后立即提交一条消息。我按照这个链接(https://www.confluence.io/blog/apache-kafka-spring-boot-application)使用spring创建了一个Kafka消费者。通常情况下,它工作得很好,消费者收到消息并等待,直到另一个人进入队列。但问题是,当我处理这些消息时,需要花费很多时间(大约10分钟),kafka队列认为该消息没有被消费(提交),并且消费者一次又一次地读取它。我不得不说,当我的处理时间少于 5 分钟时,它运行良好,但当它持续更长的时间时,它不会提交消息。

我已经寻找了一些答案,但它对我没有帮助,因为我没有使用相同的源代码(当然还有不同的结构)。我尝试发送异步方法并异步提交消息,但失败了。一些来源是:

Spring Boot Kafka:由于组已经重新平衡,因此无法完成提交

https://www.confluence.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://dzone.com/articles/kafka-clients-at-most-once-at-least-once-exactly-o

Kafka 0.10 Java消费者不从主题读取消息

https://github.com/confluenceinc/confluence-kafka-dotnet/issues/470

主要课程在这里:


@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }


Run Code Online (Sandbox Code Playgroud)

消费者类(我需要在其中提交消息)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka

5
推荐指数
1
解决办法
5833
查看次数

通过 TERMINAL 关闭 Kafka

:你好,我叫牛逼卡夫卡和我开始在窗口(通过终端)卡夫卡服务器follwing这个文档succedded https://dzone.com/articles/running-apache-kafka-on-windows-os https://开头卡夫卡.apache.org/快速入门

我的问题很简单,我应该如何通过 Windows 中的 cmd 终端正确关闭 KAfka?

问题是,当我关闭不同的终端(为了关闭我的计算机)时,我无法再次启动它,因为 kafka 内存不足并且致命崩溃

 ERROR Error while loading log dir C:\Users\u1\Desktop\kafka-logs (kafka.log.LogManager)
java.io.IOException: Map failed
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:944)
        at kafka.log.AbstractIndex.<init>(AbstractIndex.scala:126)
.
Caused by: java.lang.OutOfMemoryError: Map failed
        at sun.nio.ch.FileChannelImpl.map0(Native Method)
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:941)
Run Code Online (Sandbox Code Playgroud)

有没有办法使用终端命令正确关闭我的消费者、生产者、主题、kafka 和生产者?

在此处输入图片说明 我找到了很多关于通过代码关闭它的文档,但这不是我的问题。 关闭 Kafka 连接

-------- 更新------- 我不知道如何宣布解决方案的获胜者,因为两者(Robin Moffatt、Mukesh Prajapati)都是正确的。因为当我按 CTRL + C 时,我的终端会询问我是否关闭正在执行的程序。此外,当我重新启动它时,效果很好。另一方面,Mukesh Prajapati 提供的解决方案也像魅力一样工作,正如您在以下屏幕中看到的那样,它关闭了动物园管理员的实例。(感谢两位的帮助) 在此处输入图片说明

windows cmd apache-kafka

3
推荐指数
1
解决办法
2956
查看次数

标签 统计

apache-kafka ×2

cmd ×1

java ×1

spring ×1

windows ×1