San*_*p B 3 spring spring-kafka
我们想在一定间隔后(例如每5分钟)消耗一次记录.消费者属性是标准的:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(300000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
return factory;
}
Run Code Online (Sandbox Code Playgroud)
即使我更改了属性,setPollTimeout
它在定义的间隔(5分钟)后也不会轮询,它会在30秒后继续轮询,这是我的日志:
2018-01-23 18:07:26.875 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 2
2018-01-23 18:07:56.901 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 4
Run Code Online (Sandbox Code Playgroud)
我们尝试使用窗口化聚合构建一个kafka流应用程序,并计划在y间隔后使用窗口x.
我可以在课堂上看到:KafkaMessageListenerContainer
,setConsumerTaskExecutor
设置:
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
Run Code Online (Sandbox Code Playgroud)
但是,我们如何配置此(频率)线程池何时轮询记录.任何帮助赞赏.
您无法控制消费者轮询的速率,pollTimeout是poll()
等待新记录到达的时间长度.如果新记录更频繁地到达,它将不会等待那么久.
如果您希望控制接收记录的速率,只需使用DefatulKafkaConsumerFactory
创建消费者并随时轮询它.
你不能使用它@KafkaListener
- 你必须自己处理记录.
该功能是在2.3版本中引入的。
从版本2.3开始,ContainerProperties提供了一个 idleBetweenPolls选项,让侦听器容器中的主循环在KafkaConsumer.poll()调用之间休眠。实际的睡眠间隔被选择为提供的选项中的最小值以及 max.poll.interval.ms 消费者配置和当前记录批处理时间之间的差异。
https://docs.spring.io/spring-kafka/reference/html/
KafkaListenerConfig.java
package br.com.sicredi.spi.icom.consumer.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaListenerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setIdleBetweenPolls(100); // 100 miliseconds
return factory;
}
private ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
// ...
return props;
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3663 次 |
最近记录: |