我正在使用Spring Kafka使用者从主题中获取消息并将其持久保存到数据库中.如果满足失败条件,比如说db不可用,kafka使用者库是否提供重试机制?如果是,有没有办法设置不同的重试间隔,如第一次重试应在5分钟后完成,第二次在30分钟后完成,第3次在1小时后等.
我正在考虑使用Apache Kafka作为事件存储来在微服务中存储事件.
我通过各种博客阅读的一件事是,Kafka可以被认为是单一的事实来源,其中Kafka日志将存储给定主题的所有事件.
我想知道Kafka是否有能力从一开始就重播消息(例如,如果发生硬盘驱动器/网络崩溃)?
(请注意,我发现在主题目录下的/ tmp文件夹中存储了一些日志).有没有人知道可以调用以重播主题中的消息的任何命令(如果有的话)?
我使用Swagger为rest API生成一个文档,但我正在构建Kafka监听器并希望为它生成一个文档.我们是否有可能使用Swagger或类似的东西?
谢谢,Manish
我在Spring Kafka设置中使用Avro和Schema注册表.
我想以某种方式处理SerializationException,反序列化期间可能会抛出.
我找到了以下两个资源:
https://github.com/spring-projects/spring-kafka/issues/164
这些资源表明我返回null而不是SerializationException在反序列化和监听时抛出KafkaNull.这个解决方案很好用.
但是,我希望能够抛出异常而不是返回null.
KIP-161和KIP-210为处理异常提供了更好的功能.我确实在Spring Cloud中找到了一些提到KIP-161的资源,但没有具体说明Spring-Kafka.
有谁知道如何捕获SerializationExceptionSpring Boot?
我使用的是Spring Boot 2.0.2
编辑:我找到了解决方案.
我宁愿抛出异常并抓住它而不是返回null或KafkaNull.我在多个不同的项目中使用我的自定义Avro序列化器和反序列化器,其中一些不是Spring.如果我更改了我的Avro序列化程序和反序列化程序,则需要更改其他一些项目以期望反序列化程序返回null.
我想关闭容器,这样我就不会丢失任何消息.在生产中永远不应该期望SerializationException.只有在Schema Registry关闭或者以某种方式将未格式化的消息发送到生产kafka时,才能发生SerializationException.无论哪种方式,SerializationException应该只发生很少,如果它发生,那么我想关闭容器,以便没有消息丢失,我可以调查问题.
只需考虑将从您的消费者容器中捕获所有异常.在我的具体情况下,我只想关闭它,如果它是一个SerializationException
public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
//Only call super if the exception is SerializationException
if (thrownException instanceof SerializationException) {
//This will shutdown the container.
super.handle(thrownException, records, consumer, container);
} else {
//Wrap …Run Code Online (Sandbox Code Playgroud) 我正在写一个 Kafka 消费者。我需要将环境变量主题名称传递给@KafkaListener(topics = ...). 这是我迄今为止尝试过的:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@Autowired
private EnvProperties envProperties;
private final String topic = envProperties.getTopic();
@KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
public void consume(String message) {
logger.info("Consuming messages " +envProperties.getTopic());
}
}
Run Code Online (Sandbox Code Playgroud)
我在线路上遇到错误topics = "#{'${envProperties.getTopic()}'}",应用程序无法启动。
如何从环境变量动态设置此主题名称?
与:这个问题相关
我正在尝试通过 @KafkaListener 阅读压缩主题。我希望每个消费者每次都能阅读整个主题。
我无法为每个消费者生成唯一的 groupId。所以我想使用一个空的groupid。
我尝试配置容器和消费者以将 groupId 设置为 null,但都不起作用。
这是我的容器配置:
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// Set ackMode to manual and never commit, we are reading from the beginning each time
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setAckOnError(false);
// Remove groupId, we are consuming all partitions here
factory.getContainerProperties().setGroupId(null);
// Enable idle event in order to detect when init phase is over
factory.getContainerProperties().setIdleEventInterval(1000L);
Run Code Online (Sandbox Code Playgroud)
还尝试强制消费者配置:
Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
// Override group id property to force "null"
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
ConsumerFactory<Object, Object> …Run Code Online (Sandbox Code Playgroud) 目前我有这样的代码:
KafkaTemplate<String, String> kafkaTemplate;
List<Pet> myData;
for(Pet p: myData) {
String json = objectWriter.writeValueAsString(p)
kafkaTemplate.send(topic, json)
}
Run Code Online (Sandbox Code Playgroud)
所以每个列表项都是一一发送的。如何一次发送整个列表?
对于 spring-kafka,有两种类型的 Kafka 监听器。
记录听众:
@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenSingle(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
/* Process my kafka message */
}
Run Code Online (Sandbox Code Playgroud)
和批量监听器:
/*
Consumer factory is initialized with setBatchListener(true)
*/
@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenBatch(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception {
messages.forEach({
/* Process my kafka message */
});
}
Run Code Online (Sandbox Code Playgroud)
根据文档,它似乎对 Kafka 消费者(无论如何都会轮询多条消息)没有任何影响。
然后我不明白为什么我应该使用批处理侦听器而不是另一个,因为批处理侦听器有一些记录侦听器没有的限制(拦截器、偏移管理等)?
也许我误解了什么?批量监听器有什么好处?
我有一个 spring boot (2.1.6) 应用程序,它既消费又向(组织范围内的)公共 kafka 实例生成消息。我正在尝试使用 spring 执行器在我的应用程序中为这个 kafka 代理实施健康检查,但我面临着一系列与性能和日志记录相关的问题。spring boot 2.0 中内置了一个健康指示器,但由于一些明显的问题,他们将其删除。
这是我实现的健康检查类:
@Component
public class KafkaHealthCheck implements HealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);
private KafkaAdmin kafkaAdmin;
private Map<String, Object> kafkaConfig;
@Value("${application.topic}")
private String topicName;
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
this.kafkaAdmin = kafkaAdmin;
}
@PostConstruct
public void setUpAdminClient() {
kafkaConfig = new HashMap<>();
kafkaConfig.putAll(kafkaAdmin.getConfig());
kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
}
@Override
public Health health() {
Long start = System.currentTimeMillis();
try (AdminClient adminClient = …Run Code Online (Sandbox Code Playgroud) 我的消费者配置如下
package com.example.kafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-tenent1-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Run Code Online (Sandbox Code Playgroud)
但是,当应用程序启动时,我看到以下内容不断输出
由于节点 -1 已断开连接,已取消相关 ID 为 1 的进行中 API_VERSIONS 请求 …
spring-kafka ×10
apache-kafka ×7
java ×5
spring-boot ×4
spring ×2
spring-el ×1
swagger ×1
swagger-ui ×1