gst*_*low 5 java spring apache-kafka kafka-producer-api spring-kafka
KafkaPropertiesjava 文档:
/**
* What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server.
*/
private String autoOffsetReset;
Run Code Online (Sandbox Code Playgroud)
我有你好世界应用程序,其中包含application.properties
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=latest
Run Code Online (Sandbox Code Playgroud)
在这种情况下,@KafkaListener将为所有条目调用方法。但预期结果是@KafkaListener仅针对我发送的最新 3 个选项调用该方法。我尝试使用另一个选项:
spring.kafka.consumer.auto-offset-reset=earlisest
Run Code Online (Sandbox Code Playgroud)
但行为是一样的。
你能解释一下这个东西吗?
代码示例:
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("spring_kafka_topic", "foo1");
this.template.send("spring_kafka_topic", "foo2");
this.template.send("spring_kafka_topic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "spring_kafka_topic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}
Run Code Online (Sandbox Code Playgroud)
行为不取决于
spring.kafka.consumer.auto-offset-reset
它仅取决于 spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit
如果我设置spring.kafka.consumer.enable-auto-commit=false- 我会看到所有记录。
如果我设置spring.kafka.consumer.enable-auto-commit=true- 我只看到最后 3 条记录。
请澄清spring.kafka.consumer.auto-offset-reset财产的含义
Spring Boot中KafkaProperties是这样做的:
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitInterval != null) {
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
this.autoCommitInterval);
}
if (this.autoOffsetReset != null) {
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
this.autoOffsetReset);
}
Run Code Online (Sandbox Code Playgroud)
这buildProperties()是从buildConsumerProperties()which中使用的,依次在:
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<Object, Object>(
this.properties.buildConsumerProperties());
}
Run Code Online (Sandbox Code Playgroud)
因此,如果您使用自己的ConsumerFactorybean 定义,请务必重用它们KafkaProperties:https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafka-extra-道具
更新
好的。我明白发生了什么事。
尝试添加此属性:
spring.kafka.consumer.enable-auto-commit=false
Run Code Online (Sandbox Code Playgroud)
这样我们就不会根据某个提交间隔进行异步自动提交。
我们应用程序中的逻辑基于latch.await(60, TimeUnit.SECONDS);. 当我们得到3预期的记录时,我们退出。这样,来自消费者的异步自动提交可能还不会发生。因此,下次运行应用程序时,消费者将从未提交的偏移量中轮询数据。
当我们关闭自动提交时,我们有一个AckMode.BATCH,它是同步执行的,并且我们能够看到该foo消费者组的主题中真正最新的记录。