Spark会使用此sortByKey/map/collect序列保留密钥顺序吗?

Met*_*est 1 hadoop scala bigdata apache-spark

让我们说,我们有这个.

val sx = sc.parallelize(Array((0, 39), (4, 47), (3, 51), (1, 98), (2, 61)))
Run Code Online (Sandbox Code Playgroud)

我们后来称之为.

val sy = sx.sortByKey(true)
Run Code Online (Sandbox Code Playgroud)

哪个会

sy = RDD[(0, 39), (1, 98), (2, 61), (3, 51), (4, 47)] 
Run Code Online (Sandbox Code Playgroud)

然后我们这样做

collected = sy.map(x => (x._2 / 10, x._2)).collect
Run Code Online (Sandbox Code Playgroud)

我们会不会得到以下内容.我的意思是,尽管改变了键值,原始的键顺序是否会被保留?

collected = [(3, 39), (9, 98), (6, 61), (5, 51), (4, 47)]
Run Code Online (Sandbox Code Playgroud)

Roh*_*tty 8

运用map()转化,并呼吁collect()不会改变返回的数组元素的顺序collect().为了证明这一点,我们只需要表明:

  • map不会修改RDD中元素的顺序
  • collect将始终在每次调用时以相同的数组顺序返回RDD的元素

第一点很容易证明.在引擎盖下,调用map()只是MapPartitionsRDD通过遍历每个分区并调用传递给map()分区中每个元素的函数参数来生成一个.因此,此处不修改排序,因为每个分区内的元素排序保持不变.

第二点可以通过仔细研究来证明collect().以下代码是实现collect()以及收集调用的函数.

来自RDD.scala:

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}
Run Code Online (Sandbox Code Playgroud)

来自SparkContext.scala:

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
  runJob(rdd, func, 0 until rdd.partitions.length)
}
Run Code Online (Sandbox Code Playgroud)

runJob()被调用的函数(它是一个重载方法)将Seq[Int]包含处理分区的顺序传递给另一个runJob()方法.此订单最终会冒泡到调度程序,这将确定操作将如何处理分区.因此,在这种情况下collect(),我们将始终按顺序从第一个开始处理分区.

因此,由于既map()不会collect()修改分区中的分区顺序或元素的顺序,也会每次都看到对于collect的结果的相同顺序.但是,如果您在收集之前应用需要随机播放的转换,则所有投注都将关闭,因为数据将被重新分区.