VB_*_*VB_ 5 java scala shuffle apache-spark rdd
我有 5 个分区-RDD 和 5 个工人/执行者。如何让 Spark 将每个 RDD 的分区保存在不同的工作人员(IP)上?
如果我说 Spark 可以在一个 worker 上保存几个分区,而在其他 worker 上保存 0 个分区,我说得对吗?意思是,我可以指定分区数,但 Spark 仍然可以在单个节点上缓存所有内容。
复制不是一种选择,因为 RDD 很大。
RDD 的getPreferredLocations方法不提供分区将存储在指定节点上的 100% 保证。Spark 将在 期间尝试spark.locality.wait,但之后,Spark 将在不同的节点上缓存分区。
作为一种解决方法,您可以将非常高的值设置为spark.locality.wait并覆盖getPreferredLocations。坏消息 - 你不能用 Java 做到这一点,你需要编写 Scala 代码。至少 Scala 内部封装了 Java 代码。IE:
class NodeAffinityRDD[U: ClassTag](prev: RDD[U]) extends RDD[U](prev) {
val nodeIPs = Array("192.168.2.140","192.168.2.157","192.168.2.77")
override def getPreferredLocations(split: Partition): Seq[String] =
Seq(nodeIPs(split.index % nodeIPs.length))
}
Run Code Online (Sandbox Code Playgroud)
SparkContext 有 makeRDD 方法。这种方法缺乏文档。据我了解,我可以指定首选位置,然后将高值设置为spark.locality.wait。坏消息 - 首选位置将在第一次 shuffle/join/cogroup 操作中被丢弃。
这两种方法都有一个缺点,spark.locality.wait如果某些节点不可用,太高会导致集群饿死。
我有多达 10,000 个sales-XXX.parquet文件,每个文件代表不同地区不同商品的销售情况。每个都sales-XXX.parquet可以从几 KB 到几 GB 不等。sales-XXX.parquet在 HDFS 中,所有这些可能会占用多达数十或数百 GB。我需要对所有销售进行全文搜索。我必须sales-XXX.parquet用 Lucene 一一索引。现在我有两个选择: