让我们假设以下每个时间点只运行一个Spark作业.
以下是我了解Spark中发生的事情:
SparkContext被创建,每个工作节点开始执行人.执行程序是单独的进程(JVM),它连接回驱动程序.每个执行程序都有驱动程序的jar.退出驱动程序,关闭执行程序.每个执行程序都可以保存一些分区.我明白那个
- 任务是通过序列化Function对象从驱动程序发送到执行程序的命令.
- 执行程序反序列化(使用驱动程序jar)命令(任务)并在分区上执行它.
但
如何将舞台分成这些任务?
特别:
在https://0x0fff.com/spark-architecture-shuffle中,随着图像解释了随机播放
我得到了规则的印象
每个阶段被分成#count-of-partitions任务,不考虑节点数量
对于我的第一张图片,我会说我有3个地图任务和3个减少任务.
对于来自0x0fff的图像,我会说有8个地图任务和3个减少任务(假设只有三个橙色和三个深绿色文件).
那是对的吗?但即使这是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,无论多个操作(例如多个地图)是在一个任务内还是每个操作分成一个任务.
Spark的任务是什么?Spark工作人员如何执行jar文件?以及Apache Spark调度程序如何将文件拆分为任务?是相似的,但我觉得我的问题在那里得不到清楚.
jav*_*dba 44
你有一个非常好的大纲.回答你的问题
task 确实需要推出针对每个数据的每个分区stage.考虑每个分区可能位于不同的物理位置 - 例如HDFS中的块或本地文件系统的目录/卷.请注意,Stages 的提交是由DAG Scheduler.这意味着可以将不相互依赖的阶段提交到集群以便并行执行:这最大化了集群上的并行化功能.因此,如果我们的数据流中的操作可以同时发生,我们将期望看到启动多个阶段.
我们可以看到以下玩具示例中的操作,其中我们执行以下类型的操作:
那么我们最终会有多少阶段?
join一点的第三阶段取决于其他两个阶段这是玩具计划
val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")
Run Code Online (Sandbox Code Playgroud)
这是结果的DAG
现在:有多少任务?任务数量应该等于
(Stage*#Partitions in the stage)的总和
ped*_*iri 18
这可能有助于您更好地理解不同的部分:
Har*_*man 13
如果我理解正确,有2个(相关的)事情让你感到困惑:
1)什么决定了任务的内容?
2)什么决定了要执行的任务数量?
Spark的引擎在连续的rdds上"粘合" 简单的操作,例如:
rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count
Run Code Online (Sandbox Code Playgroud)
因此当rdd3(懒惰地)计算时,spark将为rdd1的每个分区生成一个任务,并且每个任务将执行过滤器和每行的映射以产生rdd3.
任务数由分区数决定.每个RDD都有一个已定义数量的分区.对于从HDFS读取的源RDD(例如,使用sc.textFile(...)),分区数是由输入格式生成的分割数.RDD上的某些操作可能导致RDD具有不同数量的分区:
rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).
Run Code Online (Sandbox Code Playgroud)
另一个例子是加入:
rdd3 = rdd1.join( rdd2 , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).
Run Code Online (Sandbox Code Playgroud)
(大多数)改变分区数量的操作涉及一个shuffle,当我们这样做时:
rdd2 = rdd1.repartition( 1000 )
Run Code Online (Sandbox Code Playgroud)
实际发生的是rdd1的每个分区上的任务需要产生一个可以被下一阶段读取的结束输出,以便使rdd2有1000个分区(他们是如何做的?哈希或排序).这方面的任务有时被称为"地图(侧面)任务".稍后在rdd2上运行的任务将作用于一个分区(rdd2!),并且必须弄清楚如何读取/组合与该分区相关的地图侧输出.这方面的任务有时被称为"减少(侧面)任务".
这两个问题是相关的:一个阶段中的任务数量是分区的数量(连续的rdds"胶合"在一起)和rdd的分区数量可以在阶段之间改变(通过指定一些分区的数量) shuffle导致操作例如).
一旦阶段的执行开始,其任务可以占用任务时隙.并发任务槽的数量是numExecutors*ExecutorCores.通常,这些可以由来自不同的非依赖阶段的任务占据.
| 归档时间: |
|
| 查看次数: |
41642 次 |
| 最近记录: |