Dyn*_*ite 9 java spring apache-kafka spring-kafka
对于 spring-kafka,有两种类型的 Kafka 监听器。
记录听众:
@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenSingle(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
/* Process my kafka message */
}
Run Code Online (Sandbox Code Playgroud)
和批量监听器:
/*
Consumer factory is initialized with setBatchListener(true)
*/
@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenBatch(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception {
messages.forEach({
/* Process my kafka message */
});
}
Run Code Online (Sandbox Code Playgroud)
根据文档,它似乎对 Kafka 消费者(无论如何都会轮询多条消息)没有任何影响。
然后我不明白为什么我应该使用批处理侦听器而不是另一个,因为批处理侦听器有一些记录侦听器没有的限制(拦截器、偏移管理等)?
也许我误解了什么?批量监听器有什么好处?
小智 6
在我的用例中,优势不是来自对 Kafka 的处理,而是来自侦听器中的后续处理。例如,如果您必须在侦听器的消息处理中调用 REST API,则可以使用批处理侦听器以批量方式执行此操作。您可以将整个列表传递给 API。当然,外部API也必须支持批量操作。另一个示例可能是针对处理 Kafka 记录时访问的数据库进行批量处理。
例如:
@KafkaListener
public void receive(List<AnyPojo> pojo) {
myPojoRepository.saveAll(pojo);
}
Run Code Online (Sandbox Code Playgroud)
如果您在没有批处理的情况下执行此操作,这将导致每个记录都有一个新事务,这比批量/批处理慢得多:
@KafkaListener
public void receive(AnyPojo pojo) {
myPojoRepository.save(pojo);
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
8506 次 |
最近记录: |