再次使用来自 kafka 日志压缩主题的消息

car*_*ojc 2 apache-kafka spring-kafka

我有一个带有 Kafka 消费者的 Spring 应用程序,使用 @KafkaListerner 注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的场景。以编程方式实现这一目标的最佳方法是什么?我们不控制 Kafka 主题配置。

Gar*_*ell 6

    @KafkaListener(...)
    public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        if (this.resetNeeded) {
            consumer.seekToBeginning(consumer.assignment());
            this.resetNeeded = false;
        }
    }
Run Code Online (Sandbox Code Playgroud)

如果您想在侦听器空闲(无记录)时重置,您可以启用空闲事件并通过侦听 a ListenerContainerIdleEventin anApplicationListener@EventListener方法来执行搜索。

该事件具有对消费者的引用。

编辑

@SpringBootApplication
public class So58769796Application {

    public static void main(String[] args) {
        SpringApplication.run(So58769796Application.class, args);
    }

    @KafkaListener(id = "so58769796", topics = "so58769796")
    public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("One:" + key + ":" + value);
    }

    @KafkaListener(id = "so58769796a", topics = "so58769796")
    public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("Two:" + key + ":" + value);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58769796")
                .compact()
                .partitions(1)
                .replicas(1)
                .build();
    }

    boolean reset;

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so58769796", "foo", "bar");
            System.out.println("Hit enter to rewind");
            System.in.read();
            this.reset = true;
        };
    }

    @EventListener
    public void listen(ListenerContainerIdleEvent event) {
        System.out.println(event);
        if (this.reset && event.getListenerId().startsWith("so58769796-")) {
            event.getConsumer().seekToBeginning(event.getConsumer().assignment());
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

spring.kafka.listener.idle-event-interval=5000
Run Code Online (Sandbox Code Playgroud)

编辑2

这是另一种技术 - 在这种情况下,我们在每次应用程序启动时(和按需)倒带......

spring.kafka.listener.idle-event-interval=5000
Run Code Online (Sandbox Code Playgroud)