标签: spring-kafka

Spring Kafka Consumer Retry

我正在使用Spring Kafka使用者从主题中获取消息并将其持久保存到数据库中.如果满足失败条件,比如说db不可用,kafka使用者库是否提供重试机制?如果是,有没有办法设置不同的重试间隔,如第一次重试应在5分钟后完成,第二次在30分钟后完成,第3次在1小时后等.

apache-kafka spring-boot kafka-consumer-api spring-kafka

9
推荐指数
1
解决办法
9166
查看次数

Apache Kafka:重播主题中的消息

我正在考虑使用Apache Kafka作为事件存储来在微服务中存储事件.

我通过各种博客阅读的一件事是,Kafka可以被认为是单一的事实来源,其中Kafka日志将存储给定主题的所有事件.

我想知道Kafka是否有能力从一开始就重播消息(例如,如果发生硬盘驱动器/网络崩溃)?

(请注意,我发现在主题目录下的/ tmp文件夹中存储了一些日志).有没有人知道可以调用以重播主题中的消息的任何命令(如果有的话)?

apache-kafka spring-kafka

9
推荐指数
2
解决办法
8026
查看次数

Kafka Listener的Swagger文档

我使用Swagger为rest API生成一个文档,但我正在构建Kafka监听器并希望为它生成一个文档.我们是否有可能使用Swagger或类似的东西?

谢谢,Manish

spring swagger swagger-ui spring-kafka

9
推荐指数
1
解决办法
1355
查看次数

反序列化后如何处理SerializationException

我在Spring Kafka设置中使用Avro和Schema注册表.

我想以某种方式处理SerializationException,反序列化期间可能会抛出.

我找到了以下两个资源:

https://github.com/spring-projects/spring-kafka/issues/164

如何配置spring-kafka忽略错误格式的消息?

这些资源表明我返回null而不是SerializationException在反序列化和监听时抛出KafkaNull.这个解决方案很好用.

但是,我希望能够抛出异常而不是返回null.

KIP-161KIP-210为处理异常提供了更好的功能.我确实在Spring Cloud中找到了一些提到KIP-161的资源,但没有具体说明Spring-Kafka.

有谁知道如何捕获SerializationExceptionSpring Boot?

我使用的是Spring Boot 2.0.2

编辑:我找到了解决方案.

我宁愿抛出异常并抓住它而不是返回null或KafkaNull.我在多个不同的项目中使用我的自定义Avro序列化器和反序列化器,其中一些不是Spring.如果我更改了我的Avro序列化程序和反序列化程序,则需要更改其他一些项目以期望反序列化程序返回null.

我想关闭容器,这样我就不会丢失任何消息.在生产中永远不应该期望SerializationException.只有在Schema Registry关闭或者以某种方式将未格式化的消息发送到生产kafka时,才能发生SerializationException.无论哪种方式,SerializationException应该只发生很少,如果它发生,那么我想关闭容器,以便没有消息丢失,我可以调查问题.

只需考虑将从您的消费者容器中捕获所有异常.在我的具体情况下,我只想关闭它,如果它是一个SerializationException

public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer container) {

        //Only call super if the exception is SerializationException
        if (thrownException instanceof SerializationException) {
            //This will shutdown the container.
            super.handle(thrownException, records, consumer, container);
        } else {
            //Wrap …
Run Code Online (Sandbox Code Playgroud)

error-handling serialization deserialization spring-kafka

9
推荐指数
1
解决办法
3077
查看次数

如何将动态主题名称从环境变量传递给@KafkaListener(topics)

我正在写一个 Kafka 消费者。我需要将环境变量主题名称传递给@KafkaListener(topics = ...). 这是我迄今为止尝试过的:

 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener; 
 import org.springframework.stereotype.Service;

 @Service
 public class KafkaConsumer {

     @Autowired
     private EnvProperties envProperties;

     private final String topic = envProperties.getTopic();

     @KafkaListener(topics = "#{'${envProperties.getTopic()}'}", groupId = "group_id")
     public void consume(String message) {
        logger.info("Consuming messages " +envProperties.getTopic());
     }
}
Run Code Online (Sandbox Code Playgroud)

我在线路上遇到错误topics = "#{'${envProperties.getTopic()}'}",应用程序无法启动。

如何从环境变量动态设置此主题名称?

java spring-el spring-boot kafka-consumer-api spring-kafka

9
推荐指数
1
解决办法
3357
查看次数

如何在@KafkaListeners中将groupId设置为null

与:这个问题相关

我正在尝试通过 @KafkaListener 阅读压缩主题。我希望每个消费者每次都能阅读整个主题。

我无法为每个消费者生成唯一的 groupId。所以我想使用一个空的groupid。

我尝试配置容器和消费者以将 groupId 设置为 null,但都不起作用。

这是我的容器配置:

 ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // Set ackMode to manual and never commit, we are reading from the beginning each time
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckOnError(false);
        // Remove groupId, we are consuming all partitions here
        factory.getContainerProperties().setGroupId(null);
        // Enable idle event in order to detect when init phase is over
        factory.getContainerProperties().setIdleEventInterval(1000L);
Run Code Online (Sandbox Code Playgroud)

还尝试强制消费者配置:

Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
        // Override group id property to force "null"
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
        ConsumerFactory<Object, Object> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka spring-kafka

9
推荐指数
1
解决办法
2万
查看次数

如何使用Spring Kafka生产者发送批量数据

目前我有这样的代码:

KafkaTemplate<String, String> kafkaTemplate;

List<Pet> myData;

for(Pet p: myData) {
  String json = objectWriter.writeValueAsString(p)
  kafkaTemplate.send(topic, json)
}

Run Code Online (Sandbox Code Playgroud)

所以每个列表项都是一一发送的。如何一次发送整个列表?

java apache-kafka spring-kafka

9
推荐指数
2
解决办法
2万
查看次数

Spring Kafka:记录侦听器与批处理侦听器

对于 spring-kafka,有两种类型的 Kafka 监听器。

记录听众

@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenSingle(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    /* Process my kafka message */
}
Run Code Online (Sandbox Code Playgroud)

批量监听器

/*
    Consumer factory is initialized with setBatchListener(true)
*/

@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenBatch(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception {
    messages.forEach({
        /* Process my kafka message */
    });
}
Run Code Online (Sandbox Code Playgroud)

根据文档,它似乎对 Kafka 消费者(无论如何都会轮询多条消息)没有任何影响。

然后我不明白为什么我应该使用批处理侦听器而不是另一个,因为批处理侦听器有一些记录侦听器没有的限制(拦截器、偏移管理等)?

也许我误解了什么?批量监听器有什么好处?

java spring apache-kafka spring-kafka

9
推荐指数
1
解决办法
8506
查看次数

在我的 Spring Boot 应用程序中实现 Kafka 健康检查的更简洁有效的方法是什么?

我有一个 spring boot (2.1.6) 应用程序,它既消费又向(组织范围内的)公共 kafka 实例生成消息。我正在尝试使用 spring 执行器在我的应用程序中为这个 kafka 代理实施健康检查,但我面临着一系列与性能和日志记录相关的问题。spring boot 2.0 中内置了一个健康指示器,但由于一些明显的问题,他们将其删除。

这是我实现的健康检查类:

@Component
public class KafkaHealthCheck implements HealthIndicator {

  private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);

  private KafkaAdmin kafkaAdmin;

  private Map<String, Object> kafkaConfig;

  @Value("${application.topic}")
  private String topicName;

  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;

  public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
    this.kafkaAdmin = kafkaAdmin;

  }

  @PostConstruct
  public void setUpAdminClient() {
    kafkaConfig = new HashMap<>();
    kafkaConfig.putAll(kafkaAdmin.getConfig());
    kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  }

  @Override
  public Health health() {
    Long start = System.currentTimeMillis();
    try (AdminClient adminClient = …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

9
推荐指数
0
解决办法
3314
查看次数

由于节点 -1 断开连接,Kafka 消费者错误取消了关联 ID 为 1 的正在进行的 API_VERSIONS 请求

我的消费者配置如下

package com.example.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-tenent1-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}
Run Code Online (Sandbox Code Playgroud)

但是,当应用程序启动时,我看到以下内容不断输出

由于节点 -1 已断开连接,已取消相关 ID 为 1 的进行中 API_VERSIONS 请求 …

java apache-kafka spring-boot spring-kafka

9
推荐指数
1
解决办法
3万
查看次数