Dmi*_*Zyr 2 performance scala query-optimization apache-spark apache-spark-sql
我com.datastax.spark:spark-cassandra-connector_2.11:2.4.0在运行 zeppelin notebooks 时使用,但不明白 spark 中两个操作之间的区别。第一个操作需要很多时间进行计算,第二个操作立即执行。有人可以向我解释两种操作之间的区别吗:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
case class SomeClass(val someField:String)
val timelineItems = spark.read.format("org.apache.spark.sql.cassandra").options(scala.collection.immutable.Map("spark.cassandra.connection.host" -> "127.0.0.1", "table" -> "timeline_items", "keyspace" -> "timeline" )).load()
//some simplified code:
val timelineRow = timelineItems
.map(x => {SomeClass("test")})
.filter(x => x != null)
.toDF()
.limit(4)
//first operation (takes a lot of time. It seems spark iterates through all items in Cassandra and doesn't use laziness with limit 4)
println(timelineRow.count()) //return: 4
//second operation (executes immediately); 300 - just random number which doesn't affect the result
println(timelineRow.take(300).length) //return: 4
Run Code Online (Sandbox Code Playgroud)
您看到的是Limit(类似转换的操作)和CollectLimit(类似动作的操作)的实现之间的区别。然而,时间上的差异是非常具有误导性的,在一般情况下不是您可以预期的。
首先让我们创建一个 MCVE
spark.conf.set("spark.sql.files.maxPartitionBytes", 500)
val ds = spark.read
.text("README.md")
.as[String]
.map{ x => {
Thread.sleep(1000)
x
}}
val dsLimit4 = ds.limit(4)
Run Code Online (Sandbox Code Playgroud)
确保我们从干净的石板开始:
spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
Run Code Online (Sandbox Code Playgroud)
Boolean = true
Run Code Online (Sandbox Code Playgroud)
调用count:
dsLimit4.count()
Run Code Online (Sandbox Code Playgroud)
并查看执行计划(来自 Spark UI):
spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
Run Code Online (Sandbox Code Playgroud)
核心组件是
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
Run Code Online (Sandbox Code Playgroud)
这表明我们可以期待具有多个阶段的广泛操作。我们可以看到一个作业
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Run Code Online (Sandbox Code Playgroud)
Boolean = true
Run Code Online (Sandbox Code Playgroud)
有两个阶段
spark.sparkContext.statusTracker.getJobInfo(0).get.stageIds
Run Code Online (Sandbox Code Playgroud)
dsLimit4.count()
Run Code Online (Sandbox Code Playgroud)
与八
spark.sparkContext.statusTracker.getStageInfo(0).get.numTasks
Run Code Online (Sandbox Code Playgroud)
== Parsed Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Optimized Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- Project
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject value#0.toString, obj#5: java.lang.String
+- Relation[value#0] text
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#12L])
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#15L])
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
+- *(1) Project
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- *(1) MapElements <function1>, obj#6: java.lang.String
+- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
+- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
Run Code Online (Sandbox Code Playgroud)
和一个
spark.sparkContext.statusTracker.getStageInfo(1).get.numTasks
Run Code Online (Sandbox Code Playgroud)
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
Run Code Online (Sandbox Code Playgroud)
任务分别。
现在让我们将它与
dsLimit4.take(300).size
Run Code Online (Sandbox Code Playgroud)
产生以下
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Run Code Online (Sandbox Code Playgroud)
虽然全局和局部限制仍然存在,但中间没有交换。因此,我们可以期待单阶段操作。请注意,规划器将限制缩小到更严格的值。
正如预期的那样,我们看到了一个新工作:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Run Code Online (Sandbox Code Playgroud)
Array[Int] = Array(0)
Run Code Online (Sandbox Code Playgroud)
只生成了一个阶段:
spark.sparkContext.statusTracker.getJobInfo(1).get.stageIds
Run Code Online (Sandbox Code Playgroud)
spark.sparkContext.statusTracker.getJobInfo(0).get.stageIds
Run Code Online (Sandbox Code Playgroud)
只有一项任务
spark.sparkContext.statusTracker.getStageInfo(2).get.numTasks
Run Code Online (Sandbox Code Playgroud)
Array[Int] = Array(0, 1)
Run Code Online (Sandbox Code Playgroud)
这对我们意味着什么?
countSpark 使用广泛转换并实际应用于LocalLimit每个分区的情况下,并将部分结果打乱以执行GlobalLimit.takeSpark 使用窄变换并LocalLimit仅在第一个分区上进行评估的情况下。显然,后一种方法不适用于第一个分区中的值数量低于请求的限制。
val dsLimit105 = ds.limit(105) // There are 105 lines
Run Code Online (Sandbox Code Playgroud)
在这种情况下,第一个count将使用与以前完全相同的逻辑(我鼓励您凭经验确认这一点),但take将采用完全不同的路径。到目前为止,我们只触发了两个作业:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Run Code Online (Sandbox Code Playgroud)
spark.sparkContext.statusTracker.getStageInfo(0).get.numTasks
Run Code Online (Sandbox Code Playgroud)
现在如果我们执行
dsLimit105.take(300).size
Run Code Online (Sandbox Code Playgroud)
你会看到它需要 3 个更多的工作:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Run Code Online (Sandbox Code Playgroud)
Int = 8
Run Code Online (Sandbox Code Playgroud)
那么这里发生了什么?如前所述,评估单个分区在一般情况下不足以满足限制。在这种情况下,Spark 迭代地评估LocalLimit分区,直到GlobalLimit满足为止,增加每次迭代中采用的分区数量。
这种策略可能会对性能产生重大影响。单独启动 Spark 作业并不便宜,并且在某些情况下,当上游对象是广泛转换的结果时,事情可能会变得非常丑陋(在最好的情况下,您可以读取 shuffle 文件,但如果由于某种原因丢失了这些文件,Spark 可能会被迫重新执行所有依赖项)。
总结一下:
take是一个动作,在上游流程狭窄的特定情况下可以短路,使用前几个分区LocalLimits可以满足GlobalLimits。limit是一个转换,并且总是评估 all LocalLimits,因为没有迭代逃生舱口。虽然在特定情况下一个可以比另一个表现得更好,但两者不可互换,也不能保证总体上更好的性能。
| 归档时间: |
|
| 查看次数: |
1793 次 |
| 最近记录: |