如何确保RDD的每个分区都有一些数据

Rav*_*jan 2 apache-spark

我的RDD为36个元素.我有一个3个节点的集群,每个节点有4个核心.我已经将RDD重新划分为36个部分,以便每个分区可能有一个要处理的元素,但是整个36个元素被分区,这样只有4个部分每个有9个元素,其余的部分都是空的,因此无需处理和服务器资源未得到充分利用.

如何重新分区数据以确保每个部分都有一些数据需要处理?如何确保每个零件都有3个要处理的元素?

eli*_*sah 5

根据定义,repartition(numPartitions)随机重新调整RDD中的数据以创建更多或更少的分区并在它们之间进行平衡,这总是会使网络上的所有数据混乱.

Apache Spark提供的保证是均匀分布的,但这不会产生每个分区完全相同的元素数.(该数据集的大小也非常小!)

您可以考虑使用HashPartitioner:

scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 8) 
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[31] at parallelize at <console>:27

scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> def countByPartition(rdd: RDD[(Int, None.type)]) = rdd.mapPartitions(iter => Iterator(iter.length))
countByPartition: (rdd: org.apache.spark.rdd.RDD[(Int, None.type)])org.apache.spark.rdd.RDD[Int]

scala> countByPartition(rdd).collect
res25: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5)

scala> countByPartition(rdd.partitionBy(new HashPartitioner(12))).collect
res26: Array[Int] = Array(3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3)
Run Code Online (Sandbox Code Playgroud)

我从zero323的答案中借用了关于HashPartitioner如何工作的例子和助手

我希望这有帮助 !

编辑:

如果您愿意,请执行以下操作:

scala> val rdd = sc.parallelize(for { x <- 1 to 36 } yield (x, None), 12) 
rdd: org.apache.spark.rdd.RDD[(Int, None.type)] = ParallelCollectionRDD[36] at parallelize at <console>:29

scala> countByPartition(rdd).collect
res28: Array[Int] = Array(4, 5, 4, 5, 4, 5, 4, 5)
Run Code Online (Sandbox Code Playgroud)

结果不一定相同.