Jam*_*mes 3 apache-spark apache-spark-sql spark-dataframe apache-spark-mllib
这个问题解释了Spark的随机拆分是如何工作的,Sparks RDD.randomSplit实际上是如何拆分RDD的,但是我不明白Spark 如何跟踪哪些值进入了一个拆分,以便那些相同的值不会进入第二个拆分。
如果我们看一下randomSplit的实现:
def randomSplit(weights: Array[Double], seed: Long): Array[DataFrame] = {
// It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
// constituent partitions each time a split is materialized which could result in
// overlapping splits. To prevent this, we explicitly sort each input partition to make the
// ordering deterministic.
val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan)
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
new DataFrame(sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted))
}.toArray
}
Run Code Online (Sandbox Code Playgroud)
我们可以看到它创建了两个共享相同sqlContext并带有两个不同Sample(rs)的DataFrame。
这两个DataFrame如何相互通信,以便第一个值不包含在第一个值中?
数据是否被提取两次?(假设sqlContext从数据库中选择,选择是否被执行两次?)。
它与对RDD进行采样完全相同。
假设您具有权重数组(0.6, 0.2, 0.2),Spark将为每个范围生成一个DataFrame (0.0, 0.6), (0.6, 0.8), (0.8, 1.0)。
当需要读取结果DataFrame时,Spark会越过父DataFrame。对于每个项目,生成一个随机数,如果该数字落在指定范围内,则发出该项目。所有子DataFrame共享相同的随机数生成器(技术上,具有相同种子的不同生成器),因此随机数序列是确定性的。
对于最后一个问题,如果您没有缓存父DataFrame,则每次计算输出DataFrame时都会重新获取输入DataFrame的数据。
| 归档时间: |
|
| 查看次数: |
3103 次 |
| 最近记录: |