Sot*_*her 4 python distributed apache-spark pyspark
我有许多火花数据帧,我需要在其中执行以下操作:
1) load a single spark dataframe
2) select rows from it
3) merge it with all of the previous spark dataframes
Run Code Online (Sandbox Code Playgroud)
现在,上述每个操作都需要不同数量的分区.选择行需要许多分区,例如100个分区.合并需要很少的分区,比如10个分区.
所以,我真的希望它像这样工作:
1) load a single spark dataframe
1.5) repartition into 100 partitions
2) select rows from it
2.5) repartition into 10 partitions
3) merge it with all of the previous spark dataframes
Run Code Online (Sandbox Code Playgroud)
现在,如何强制它在步骤1和2之间以及2到3之间重新分配?
我知道,当我打电话给data = data.repartition(7)它时,它会被懒惰地评估,所以它只是在实际保存时重新分配.
所以,我一直这样做:
1) load a single spark dataframe
1.5) repartition into 100 partitions
1.75) `df.count()` *just* to force materialization
2) select rows from it
2.5) repartition into 10 partitions
2.75) `df.count()` *just* to force materialization
3) merge it with all of the previous spark dataframes
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法强迫它在这里重新分配?有没有比运行count()数据帧更好的方法?
由于对spark中数据帧的所有转换都进行了懒惰评估,因此您需要执行一个操作来实际执行转换.目前没有其他方法可以强制进行转换.
可以在文档中找到所有可用的数据帧操作(查看操作).在您的情况下,而不是使用count()强制转换,您可以使用first()哪个应该明显更便宜.
在步骤2.5中,您可以替换使用repartition(),coalesce()因为它将避免完全随机播放.当新的分区数量少于之前时,这通常是有利的,因为它将最小化数据移动.
编辑:
如果您不使用任何操作,只需执行以下操作即可回答您的问题:1)重新分区,2)火花数据帧转换,3)重新分区.由于优化火花对变换执行,似乎并不总是遵循该顺序.我制作了一个小测试程序来测试它:
val df = spark.sparkContext.parallelize(Array((1.0,"a"),(2.0,"b"),(3.0,"c"),(1.0,"d"),(2.0,"e"),(3.0,"f"))).toDF("x", "y")
val df1 = df.repartition(10).filter($"x" =!= 1.0).repartition(5).filter($"y" =!= "b")
df1.explain(true)
Run Code Online (Sandbox Code Playgroud)
这将返回有关如何计算数据帧的信息.
== Parsed Logical Plan ==
'Filter NOT ('y = b)
+- Repartition 5, true
+- Filter NOT (x#5 = 1.0)
+- Repartition 10, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
+- LogicalRDD [_1#2, _2#3]
== Analyzed Logical Plan ==
x: double, y: string
Filter NOT (y#6 = b)
+- Repartition 5, true
+- Filter NOT (x#5 = 1.0)
+- Repartition 10, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
+- LogicalRDD [_1#2, _2#3]
== Optimized Logical Plan ==
Repartition 5, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
+- Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
+- LogicalRDD [_1#2, _2#3]
== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *Project [_1#2 AS x#5, _2#3 AS y#6]
+- *Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
+- Scan ExistingRDD[_1#2,_2#3]
Run Code Online (Sandbox Code Playgroud)
从这里可以看出,该repartition(10)步骤不包括在内,似乎在优化过程中已被删除.
| 归档时间: |
|
| 查看次数: |
3636 次 |
| 最近记录: |