标签: spring-kafka

卡夫卡消费者没有正确提交偏移量

我有一个使用以下属性定义的 Kafka 消费者:

session.timeout.ms = 60000
heartbeat.interval.ms = 6000
Run Code Online (Sandbox Code Playgroud)

我们注意到大约 2000 条消息的滞后,并看到消费者多次使用相同的消息(通过我们的应用程序日志)。另外,注意到一些消息需要大约 10 秒才能完全处理。我们怀疑消费者没有正确提交偏移量(或重复提交相同的旧偏移量),因此消费者接收到了相同的消息。

为了解决这个问题,我们引入了更多的属性:

auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go

And, we set the concurrency to 1.
Run Code Online (Sandbox Code Playgroud)

这解决了我们的问题。滞后开始减少并最终变为 0。

但是,我仍然不清楚为什么会首先出现问题。据我了解,默认情况下:

enable.auto.commit = true
auto.commit.interval.ms=5000
Run Code Online (Sandbox Code Playgroud)

因此,理想情况下,消费者应该每 5 秒提交一次。如果在此时间范围内未完全处理消息,会发生什么情况?消费者正在提交什么偏移量?问题是否是由于轮询记录大小过大(默认为 500)导致的

另外,关于 poll() 方法,我读到:

poll() 调用在后台以 set auto.commit.interval.ms 发出。

那么,最初如果 poll() 更早地每 5 秒发生一次(默认为 auto.commit.interval),为什么它不提交最新的偏移量?因为消费者还没有完成处理吗?然后,它应该在接下来的第 5 秒提交该偏移量。

有人可以回答这些查询并解释为什么会出现原始问题吗?

spring apache-kafka spring-boot kafka-consumer-api spring-kafka

3
推荐指数
1
解决办法
3672
查看次数

我们可以在spring boot中使用多个kafka模板吗?

在我的 spring boot kafka 发布者应用程序中,我想提供对以 String(json) 或字节格式发布消息的支持,因为我想同时提供对 json 和 avro 的支持。但是 Spring Boot 中的 Kafka 模板让我们只定义其中一个模板。有没有办法同时使用模板或任何其他方式来为 json 和 avro 提供支持?

KafkaTemplate<String, String> 仅适用于字符串,但我也想发布 avro,它应该类似于 KafkaTemplate<String, byte[]>

apache-kafka kafka-producer-api spring-kafka

3
推荐指数
1
解决办法
2801
查看次数

使用spring kafka时如何压缩生产者中的数据

我目前正在使用 spring-kafka 发送数据,如下所示:

val json = objectWriter.writeValueAsString(obj)
kafkaTemplate.send(topic, json)
Run Code Online (Sandbox Code Playgroud)

我如何告诉 KafkaTemplate 在发送之前使用 snappy 压缩 json?

java spring kotlin apache-kafka spring-kafka

3
推荐指数
1
解决办法
2626
查看次数

如何手动启动Kafka监听器?

我有一些方法注释,@KafkaListener但我只想手动启动其中的一些(取决于某些条件)。

@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
    // consume
}
Run Code Online (Sandbox Code Playgroud)
@PostConstruct
private void startConsumers() {
    if (true) {
        kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
    }
}
Run Code Online (Sandbox Code Playgroud)

但此时kafkaListenerEndpointRegistry.getListenerContainers()是空列表并kafkaListenerEndpointRegistry.getListenerContainer("consumer1")返回null。所以也许@PostConstruct调用方法的时间太早了,监听器还没有注册。我尝试使用注释startConsumers()方法,@Scheduled(fixedDelay = 100)并且侦听器已经可用。但是@Scheduled对于我想在启动应用程序后调用一次的东西,使用并不是一个好的决定。

java spring apache-kafka spring-boot spring-kafka

3
推荐指数
1
解决办法
1640
查看次数

Spring-Kafka 与 kafka-clients 直接对比

我有一个基于 spring-boot 的应用程序,需要在 Kafka 上消费/生成事件。我正在犹豫图书馆的选择。

直接使用 Kafka-clients 似乎很简单,而无需管理 spring-Kafka 和 Kafka-clients 之间的兼容性矩阵:

另一方面,使用 spring-Kafka 帮助项目规范化使用 Kafka 的配置属性并添加嵌入式 kafka 进行测试。

是否有任何其他原因使用 spring-Kafka 进行 spring-boot,而不是直接使用 kafka-clients ?

java apache-kafka spring-boot spring-kafka

3
推荐指数
1
解决办法
2256
查看次数

AWS Kafka (MSK) - 如何生成密钥库和信任库并在我的 Spring Cloud Stream 应用程序中使用它们?

有没有关于如何在 Spring Cloud Stream 应用程序中使用 AWS MSK 详细信息的信息?

我相信我们需要生成一个密钥库和信任库,然后将它们合并到我们的应用程序中?我浏览了 AWS MSK 的“客户端身份验证”页面,发现这非常令人困惑。

任何人都可以帮我解决这个问题吗?我只是想部署这个使用 AWS MSK(3 个代理)的应用程序。

谢谢你。

java amazon-web-services spring-cloud-stream spring-kafka aws-msk

3
推荐指数
1
解决办法
1631
查看次数

spring-kafka AckMode中MANUAL和MANUAL_IMMEDIATE有什么区别

从 spring-docs,我可以看到

MANUAL - 消息侦听器负责确认()确认;之后,应用与 BATCH 相同的语义。

MANUAL_IMMEDIATE - 当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。

但是如果侦听器提交偏移量到底有什么区别。为MANUAL模式做了哪些额外的步骤

apache-kafka kafka-consumer-api spring-kafka

3
推荐指数
1
解决办法
3318
查看次数

是否可以从 kafka 消息中获取消息密钥的最新值

假设我对同一个消息键有不同的值。

例如:

{
userid: 1,
email: user123@xyz.com }

{
userid: 1,
email: user456@xyz.com }

{
userid: 1,
email: user789@xyz.com }
Run Code Online (Sandbox Code Playgroud)

在上面的这种情况下,我只想要用户更新的最新值,即“user789@xyz.com”。

我的 kafka 流应该只给我第三个值,而不是前两个值。

apache-kafka apache-kafka-streams spring-kafka confluent-platform ksqldb

3
推荐指数
1
解决办法
1886
查看次数

如何优雅地关闭 spring-kafka 消费者应用程序

我已经实现了spring-kafka消费者应用程序。

我想要消费者应用程序正常关闭。

当前的消费者应用程序使用 Linux 命令终止 kill -9 pid

我现在正在使用@KafkaListener注释。

如果我退出 Spring Boot 应用程序,我想可靠地关闭消费者,我该怎么办?


我一直在使用@Predestory以可靠地退出 spring boot 应用程序,但我不太确定这是否与它有关。

java apache-kafka spring-boot kafka-consumer-api spring-kafka

3
推荐指数
1
解决办法
2823
查看次数

java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerGroupMetadata

我正在尝试创建一个简单的 Kafka 制作人。因为我是这个主题的新手,所以我遵循了一个教程。我按照视频中的建议创建了一个配置文件。这是我正在使用的配置文件。

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, KafkaProducerModel> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, KafkaProducerModel> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
Run Code Online (Sandbox Code Playgroud)

当我尝试运行我的应用程序时出现以下错误。

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerGroupMetadata
    at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
    at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
    at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    at …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot kafka-producer-api spring-kafka

3
推荐指数
1
解决办法
3067
查看次数