Spring Kafka:记录侦听器与批处理侦听器

Dyn*_*ite 9 java spring apache-kafka spring-kafka

对于 spring-kafka,有两种类型的 Kafka 监听器。

记录听众

@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenSingle(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    /* Process my kafka message */
}
Run Code Online (Sandbox Code Playgroud)

批量监听器

/*
    Consumer factory is initialized with setBatchListener(true)
*/

@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenBatch(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception {
    messages.forEach({
        /* Process my kafka message */
    });
}
Run Code Online (Sandbox Code Playgroud)

根据文档,它似乎对 Kafka 消费者(无论如何都会轮询多条消息)没有任何影响。

然后我不明白为什么我应该使用批处理侦听器而不是另一个,因为批处理侦听器有一些记录侦听器没有的限制(拦截器、偏移管理等)?

也许我误解了什么?批量监听器有什么好处?

小智 6

在我的用例中,优势不是来自对 Kafka 的处理,而是来自侦听器中的后续处理。例如,如果您必须在侦听器的消息处理中调用 REST API,则可以使用批处理侦听器以批量方式执行此操作。您可以将整个列表传递给 API。当然,外部API也必须支持批量操作。另一个示例可能是针对处理 Kafka 记录时访问的数据库进行批量处理。

例如:

@KafkaListener
public void receive(List<AnyPojo> pojo) {
  myPojoRepository.saveAll(pojo);
}
Run Code Online (Sandbox Code Playgroud)

如果您在没有批处理的情况下执行此操作,这将导致每个记录都有一个新事务,这比批量/批处理慢得多:

@KafkaListener
public void receive(AnyPojo pojo) {
  myPojoRepository.save(pojo);
}
Run Code Online (Sandbox Code Playgroud)