对于 spring-kafka,有两种类型的 Kafka 监听器。
记录听众:
@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenSingle(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
/* Process my kafka message */
}
Run Code Online (Sandbox Code Playgroud)
和批量监听器:
/*
Consumer factory is initialized with setBatchListener(true)
*/
@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenBatch(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception {
messages.forEach({
/* Process my kafka message */
});
}
Run Code Online (Sandbox Code Playgroud)
根据文档,它似乎对 Kafka 消费者(无论如何都会轮询多条消息)没有任何影响。
然后我不明白为什么我应该使用批处理侦听器而不是另一个,因为批处理侦听器有一些记录侦听器没有的限制(拦截器、偏移管理等)?
也许我误解了什么?批量监听器有什么好处?
我遇到了Spark应用程序问题。这是我的代码的简化版本:
def main(args: Array[String]) {
// Initializing spark context
val sc = new SparkContext()
val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)
// Getting files from TGZ archives
val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
logger.debug("Getting files from archive : "+tgzStream._1)
utils.getFilesFromTgzStream(tgzStream._2)
})
// We run the same process with 3 different "modes"
val modes = Seq("mode1", "mode2", "mode3")
// We cache the RDD before
val …Run Code Online (Sandbox Code Playgroud) apache-kafka ×1
apache-spark ×1
hadoop-yarn ×1
java ×1
performance ×1
scala ×1
spring ×1
spring-kafka ×1