Spring @KafkaListener在一定的时间间隔后执行并轮询记录

San*_*p B 3 spring spring-kafka

我们想在一定间隔后(例如每5分钟)消耗一次记录.消费者属性是标准的:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(300000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}
Run Code Online (Sandbox Code Playgroud)

即使我更改了属性,setPollTimeout它在定义的间隔(5分钟)后也不会轮询,它会在30秒后继续轮询,这是我的日志:

2018-01-23 18:07:26.875 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 2

2018-01-23 18:07:56.901 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 4
Run Code Online (Sandbox Code Playgroud)

我们尝试使用窗口化聚合构建一个kafka流应用程序,并计划在y间隔后使用窗口x.

我可以在课堂上看到:KafkaMessageListenerContainer,setConsumerTaskExecutor设置:

if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }
Run Code Online (Sandbox Code Playgroud)

但是,我们如何配置此(频率)线程池何时轮询记录.任何帮助赞赏.

Gar*_*ell 7

您无法控制消费者轮询的速率,pollTimeout是poll()等待新记录到达的时间长度.如果新记录更频繁地到达,它将不会等待那么久.

如果您希望控制接收记录的速率,只需使用DefatulKafkaConsumerFactory创建消费者并随时轮询它.

你不能使用它@KafkaListener- 你必须自己处理记录.

  • 您可以使用 min.fetch.bytes 和 max.fetch.wait.ms 来控制它。但这并不精确。下一个版本 2.3 添加了 polls 属性之间的空闲来帮助解决此问题。 (2认同)

Gab*_*bel 6

该功能是在2.3版本中引入的。

从版本2.3开始,ContainerProperties提供了一个 idleBetweenPolls选项,让侦听器容器中的主循环在KafkaConsumer.poll()调用之间休眠。实际的睡眠间隔被选择为提供的选项中的最小值以及 max.poll.interval.ms 消费者配置和当前记录批处理时间之间的差异。

https://docs.spring.io/spring-kafka/reference/html/

KafkaListenerConfig.java

package br.com.sicredi.spi.icom.consumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaListenerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        factory.getContainerProperties().setIdleBetweenPolls(100); // 100 miliseconds
        
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    private Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        // ... 
        return props;
    }
}
Run Code Online (Sandbox Code Playgroud)