强制分区存储在特定的执行器上

VB_*_*VB_ 5 java scala shuffle apache-spark rdd

我有 5 个分区-RDD 和 5 个工人/执行者。如何让 Spark 将每个 RDD 的分区保存在不同的工作人员(IP)上?

如果我说 Spark 可以在一个 worker 上保存几个分区,而在其他 worker 上保存 0 个分区,我说得对吗?意思是,我可以指定分区数,但 Spark 仍然可以在单个节点上缓存所有内容。

复制不是一种选择,因为 RDD 很大。

我发现的解决方法

获取首选位置

RDD 的getPreferredLocations方法不提供分区将存储在指定节点上的 100% 保证。Spark 将在 期间尝试spark.locality.wait,但之后,Spark 将在不同的节点上缓存分区。

作为一种解决方法,您可以将非常高的值设置为spark.locality.wait并覆盖getPreferredLocations。坏消息 - 你不能用 Java 做到这一点,你需要编写 Scala 代码。至少 Scala 内部封装了 Java 代码。IE:

class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) {

  val nodeIPs = Array("192.168.2.140","192.168.2.157","192.168.2.77")

  override def getPreferredLocations(split: Partition): Seq[String] =
    Seq(nodeIPs(split.index % nodeIPs.length))
}
Run Code Online (Sandbox Code Playgroud)

SparkContext 的 makeRDD

SparkContext 有 makeRDD 方法。这种方法缺乏文档。据我了解,我可以指定首选位置,然后将高值设置为spark.locality.wait。坏消息 - 首选位置将在第一次 shuffle/join/cogroup 操作中被丢弃


这两种方法都有一个缺点,spark.locality.wait如果某些节点不可用,太高会导致集群饿死。

PS 更多上下文

我有多达 10,000 个sales-XXX.parquet文件,每个文件代表不同地区不同商品的销售情况。每个都sales-XXX.parquet可以从几 KB 到几 GB 不等。sales-XXX.parquet在 HDFS 中,所有这些可能会占用多达数十或数百 GB。我需要对所有销售进行全文搜索。我必须sales-XXX.parquet用 Lucene 一一索引。现在我有两个选择:

  1. 在 Spark 中保留 Lucene 索引。已经有解决方案,但看起来很可疑。有没有更好的解决方案?
  2. 将 Lucene 索引保留在本地文件系统中。然后我可以对每个工人的索引查找结果进行 map-reduce。但是这种方法要求每个工作节点保持等量的数据。我如何确保 Spark 将在每个工作节点上保留相同数量的数据?