小智 20
回答你的问题:
独立模式使用与Mesos和Yarn模式相同的配置变量来设置执行程序的数量.该变量spark.cores.max定义了spark上下文中使用的最大核心数.默认值为无穷大,因此Spark将使用群集中的所有核心.spark.task.cpus变量定义Spark将为单个任务分配的CPU数量,默认值为1.使用这两个变量,您可以定义群集中最大并行任务数.
创建RDD子类时,您可以定义运行任务的计算机.这在getPreferredLocations方法中定义.但是由于方法签名表明这只是一个偏好,所以如果Spark检测到一台机器不忙,它将在这个空闲机器中启动任务.但是,我不知道Spark使用的机制来了解哪些机器处于空闲状态.为了实现局部性,我们(Stratio)决定让每个Partions更小,这样任务就可以减少时间并实现局部性.
每个Spark操作的任务数量根据RDD分区的长度定义.此向量是getPartitions方法的结果,如果要开发新的RDD子类,则必须覆盖该方法.此方法返回RDD的拆分方式,信息位置和分区.当您使用联合或联接操作连接两个或多个RDD时,生成的RDD的任务数是操作中涉及的RDD的最大任务数.例如:如果您加入具有100个任务的RDD1和具有1000个任务的RDD2,则生成的RDD的下一个操作将具有1000个任务.请注意,大量分区不一定是更多数据的同义词.
我希望这将有所帮助.
我同意@jlopezmat关于Spark如何选择其配置.关于您的测试代码,由于textFile实现方式,您将看到两个任务.来自SparkContext.scala:
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
Run Code Online (Sandbox Code Playgroud)
如果我们检查的价值是什么defaultMinPartitions:
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
Run Code Online (Sandbox Code Playgroud)
Spark根据原始数据集中的分区数量来选择任务数量。如果您使用 HDFS 作为数据源,则默认情况下分区数等于 HDFS 块数。您可以通过多种不同的方式更改分区的数量。前两个:作为SparkContext.textFile方法的额外参数;通过调用该RDD.repartion方法。
| 归档时间: |
|
| 查看次数: |
5521 次 |
| 最近记录: |