小编Dyn*_*ite的帖子

Spring Kafka:记录侦听器与批处理侦听器

对于 spring-kafka,有两种类型的 Kafka 监听器。

记录听众

@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenSingle(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    /* Process my kafka message */
}
Run Code Online (Sandbox Code Playgroud)

批量监听器

/*
    Consumer factory is initialized with setBatchListener(true)
*/

@KafkaListener(groupId = "group1", topics = {"my.topic"})
public void listenBatch(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) throws Exception {
    messages.forEach({
        /* Process my kafka message */
    });
}
Run Code Online (Sandbox Code Playgroud)

根据文档,它似乎对 Kafka 消费者(无论如何都会轮询多条消息)没有任何影响。

然后我不明白为什么我应该使用批处理侦听器而不是另一个,因为批处理侦听器有一些记录侦听器没有的限制(拦截器、偏移管理等)?

也许我误解了什么?批量监听器有什么好处?

java spring apache-kafka spring-kafka

9
推荐指数
1
解决办法
8506
查看次数

Spark缓存的RDD计算了n次

我遇到了Spark应用程序问题。这是我的代码的简化版本:

def main(args: Array[String]) {
    // Initializing spark context
    val sc = new SparkContext()
    val nbExecutors = sc.getConf.getInt("spark.executor.instances", 3)
    System.setProperty("spark.sql.shuffle.partitions", nbExecutors.toString)

    // Getting files from TGZ archives
    val archivesRDD: RDD[(String,PortableDataStream)] = utils.getFilesFromHDFSDirectory("/my/dir/*.tar.gz") // This returns an RDD of tuples containing (filename, inpustream)
    val filesRDD: RDD[String] = archivesRDD.flatMap(tgzStream => {
        logger.debug("Getting files from archive : "+tgzStream._1)
        utils.getFilesFromTgzStream(tgzStream._2)
    })

    // We run the same process with 3 different "modes"
    val modes = Seq("mode1", "mode2", "mode3")

    // We cache the RDD before
    val …
Run Code Online (Sandbox Code Playgroud)

performance scala hadoop-yarn apache-spark

5
推荐指数
1
解决办法
90
查看次数