我有一个使用以下属性定义的 Kafka 消费者:
session.timeout.ms = 60000
heartbeat.interval.ms = 6000
Run Code Online (Sandbox Code Playgroud)
我们注意到大约 2000 条消息的滞后,并看到消费者多次使用相同的消息(通过我们的应用程序日志)。另外,注意到一些消息需要大约 10 秒才能完全处理。我们怀疑消费者没有正确提交偏移量(或重复提交相同的旧偏移量),因此消费者接收到了相同的消息。
为了解决这个问题,我们引入了更多的属性:
auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go
And, we set the concurrency to 1.
Run Code Online (Sandbox Code Playgroud)
这解决了我们的问题。滞后开始减少并最终变为 0。
但是,我仍然不清楚为什么会首先出现问题。据我了解,默认情况下:
enable.auto.commit = true
auto.commit.interval.ms=5000
Run Code Online (Sandbox Code Playgroud)
因此,理想情况下,消费者应该每 5 秒提交一次。如果在此时间范围内未完全处理消息,会发生什么情况?消费者正在提交什么偏移量?问题是否是由于轮询记录大小过大(默认为 500)导致的
另外,关于 poll() 方法,我读到:
poll() 调用在后台以 set auto.commit.interval.ms 发出。
那么,最初如果 poll() 更早地每 5 秒发生一次(默认为 auto.commit.interval),为什么它不提交最新的偏移量?因为消费者还没有完成处理吗?然后,它应该在接下来的第 5 秒提交该偏移量。
有人可以回答这些查询并解释为什么会出现原始问题吗?
spring apache-kafka spring-boot kafka-consumer-api spring-kafka
在我的 spring boot kafka 发布者应用程序中,我想提供对以 String(json) 或字节格式发布消息的支持,因为我想同时提供对 json 和 avro 的支持。但是 Spring Boot 中的 Kafka 模板让我们只定义其中一个模板。有没有办法同时使用模板或任何其他方式来为 json 和 avro 提供支持?
KafkaTemplate<String, String> 仅适用于字符串,但我也想发布 avro,它应该类似于 KafkaTemplate<String, byte[]>
我目前正在使用 spring-kafka 发送数据,如下所示:
val json = objectWriter.writeValueAsString(obj)
kafkaTemplate.send(topic, json)
Run Code Online (Sandbox Code Playgroud)
我如何告诉 KafkaTemplate 在发送之前使用 snappy 压缩 json?
我有一些方法注释,@KafkaListener但我只想手动启动其中的一些(取决于某些条件)。
@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
// consume
}
Run Code Online (Sandbox Code Playgroud)
@PostConstruct
private void startConsumers() {
if (true) {
kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
}
}
Run Code Online (Sandbox Code Playgroud)
但此时kafkaListenerEndpointRegistry.getListenerContainers()是空列表并kafkaListenerEndpointRegistry.getListenerContainer("consumer1")返回null。所以也许@PostConstruct调用方法的时间太早了,监听器还没有注册。我尝试使用注释startConsumers()方法,@Scheduled(fixedDelay = 100)并且侦听器已经可用。但是@Scheduled对于我想在启动应用程序后调用一次的东西,使用并不是一个好的决定。
我有一个基于 spring-boot 的应用程序,需要在 Kafka 上消费/生成事件。我正在犹豫图书馆的选择。
直接使用 Kafka-clients 似乎很简单,而无需管理 spring-Kafka 和 Kafka-clients 之间的兼容性矩阵:
另一方面,使用 spring-Kafka 帮助项目规范化使用 Kafka 的配置属性并添加嵌入式 kafka 进行测试。
是否有任何其他原因使用 spring-Kafka 进行 spring-boot,而不是直接使用 kafka-clients ?
有没有关于如何在 Spring Cloud Stream 应用程序中使用 AWS MSK 详细信息的信息?
我相信我们需要生成一个密钥库和信任库,然后将它们合并到我们的应用程序中?我浏览了 AWS MSK 的“客户端身份验证”页面,发现这非常令人困惑。
任何人都可以帮我解决这个问题吗?我只是想部署这个使用 AWS MSK(3 个代理)的应用程序。
谢谢你。
java amazon-web-services spring-cloud-stream spring-kafka aws-msk
从 spring-docs,我可以看到
MANUAL - 消息侦听器负责确认()确认;之后,应用与 BATCH 相同的语义。
MANUAL_IMMEDIATE - 当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量。
但是如果侦听器提交偏移量到底有什么区别。为MANUAL模式做了哪些额外的步骤
假设我对同一个消息键有不同的值。
例如:
{
userid: 1,
email: user123@xyz.com }
{
userid: 1,
email: user456@xyz.com }
{
userid: 1,
email: user789@xyz.com }
Run Code Online (Sandbox Code Playgroud)
在上面的这种情况下,我只想要用户更新的最新值,即“user789@xyz.com”。
我的 kafka 流应该只给我第三个值,而不是前两个值。
apache-kafka apache-kafka-streams spring-kafka confluent-platform ksqldb
我已经实现了spring-kafka消费者应用程序。
我想要消费者应用程序正常关闭。
当前的消费者应用程序使用 Linux 命令终止 kill -9 pid
我现在正在使用@KafkaListener注释。
如果我退出 Spring Boot 应用程序,我想可靠地关闭消费者,我该怎么办?
我一直在使用@Predestory以可靠地退出 spring boot 应用程序,但我不太确定这是否与它有关。
java apache-kafka spring-boot kafka-consumer-api spring-kafka
我正在尝试创建一个简单的 Kafka 制作人。因为我是这个主题的新手,所以我遵循了一个教程。我按照视频中的建议创建了一个配置文件。这是我正在使用的配置文件。
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, KafkaProducerModel> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, KafkaProducerModel> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Run Code Online (Sandbox Code Playgroud)
当我尝试运行我的应用程序时出现以下错误。
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerGroupMetadata
at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
at …Run Code Online (Sandbox Code Playgroud) java apache-kafka spring-boot kafka-producer-api spring-kafka
spring-kafka ×10
apache-kafka ×9
java ×6
spring-boot ×5
spring ×3
aws-msk ×1
kotlin ×1
ksqldb ×1