至于Spark 1.6+,支持自定义分区位置的唯一API是在创建RDD时:
/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
Run Code Online (Sandbox Code Playgroud)
尽管在某些情况下非常有用(例如,当RDD.compute()必须访问一些本地资源时,而不仅仅是HDFS).这是暴露这种设置的唯一地方,但在第一次洗牌之后它将很快被丢弃(下游分区将从其最大的父级继承preferredLocation)
// snippet from org.apache.spark.rdd.ShuffledRDD.scala
override protected def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
tracker.getPreferredLocationsForShuffle(dep, partition.index)
}
Run Code Online (Sandbox Code Playgroud)
和cogroup/join(其中下游分区将使用具有显式分区程序的第一个父级).
我想知道这个设计是否是故意的,或者存在更好的托管解决方案.您认为为shuffled/coalesced/cogrouped/join RDD指定preferredLocation的最佳解决方案是什么?我是否必须编写自己的RDD实例才能实现此目的?
非常感谢您的洞察力.
更新我推测了一个可能的解决方案,令我惊讶的是不起作用:
在Apache Spark cogroup中,如何确保不移动1个> 2个操作数的RDD?
所以我删除了答案,如果你有一些有用的东西,欢迎你在这里分享,否则我们将不得不等待https://issues.apache.org/jira/browse/SPARK-18078