Rap*_*oth 6 scala apache-spark
有时,Spark会以低效的方式“优化”数据框架计划。考虑以下Spark 2.1中的示例(也可以在Spark 1.6中复制):
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
df_result
.coalesce(1)
.saveAsTable(tablename)
Run Code Online (Sandbox Code Playgroud)
在此示例中,我想在对数据帧进行昂贵的转换后写入1个文件(这只是一个演示此问题的示例)。Spark向上移动coalesce(1),使得UDF仅应用于包含1个分区的数据帧,从而破坏了并行性(有趣的repartition(1)是,这种行为不起作用)。
概括地说,当我想在转换的某个部分中增加并行度,但此后降低并行度时,就会发生此行为。
我发现了一种解决方法,包括缓存数据框,然后触发对数据框的完整评估:
val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")
val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})
val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache
df_result.rdd.count // trigger computation
df_result
.coalesce(1)
.saveAsTable(tablename)
Run Code Online (Sandbox Code Playgroud)
我的问题是:在这种情况下,还有另一种方法可以告诉Spark不降低并行度吗?
小智 7
实际上,并不是由于SparkSQL的优化,SparkSQL不会更改Coalesce运算符的位置,如执行的计划所示:
Coalesce 1
+- *Project [value#2, UDF(value#2) AS udfResult#11]
+- *SerializeFromObject [input[0, double, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
Run Code Online (Sandbox Code Playgroud)
我引用了合并API描述中的一段:
注意:jira SPARK-19399添加了此段。因此,它不应在2.0的API中找到。
但是,如果您要进行大量合并,例如将numPartitions = 1合并,则可能会导致您的计算在少于您希望的节点上进行(例如,在numPartitions = 1的情况下为一个节点)。为避免这种情况,您可以调用重新分区。这将增加一个随机播放步骤,但是意味着当前的上游分区将并行执行(无论当前分区是什么)。
合并API不会执行随机播放,但是会导致以前的RDD与当前的RDD之间的依赖关系狭窄。由于RDD是惰性评估,因此实际上是使用合并分区来完成计算的。
为了防止这种情况,您应该使用重新分区API。
| 归档时间: |
|
| 查看次数: |
2066 次 |
| 最近记录: |