HashPartitioner如何运作?

Soh*_*aib 77 scala partitioning apache-spark rdd

我读了一下文档HashPartitioner.不幸的是,除了API调用之外没有解释太多.我假设HashPartitioner根据键的哈希对分布式集进行分区.例如,如果我的数据是这样的

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
Run Code Online (Sandbox Code Playgroud)

因此,分区器会将其放入不同的分区,同一个键落在同一个分区中.但是我不明白构造函数参数的意义

new HashPartitoner(numPartitions) //What does numPartitions do?
Run Code Online (Sandbox Code Playgroud)

对于上述数据集,如果我这样做,结果会有何不同

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
Run Code Online (Sandbox Code Playgroud)

那么HashPartitioner工作怎么样呢?

zer*_*323 140

好吧,让我们让你的数据集更有趣:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)
Run Code Online (Sandbox Code Playgroud)

我们有六个要素:

rdd.count
Run Code Online (Sandbox Code Playgroud)
Long = 6
Run Code Online (Sandbox Code Playgroud)

没有分区:

rdd.partitioner
Run Code Online (Sandbox Code Playgroud)
Option[org.apache.spark.Partitioner] = None
Run Code Online (Sandbox Code Playgroud)

和八个分区:

rdd.partitions.length
Run Code Online (Sandbox Code Playgroud)
Int = 8
Run Code Online (Sandbox Code Playgroud)

现在让我们定义小助手来计算每个分区的元素数量:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}
Run Code Online (Sandbox Code Playgroud)

由于我们没有分区器,因此我们的数据集在分区之间统一分布(Spark中的默认分区方案):

countByPartition(rdd).collect()
Run Code Online (Sandbox Code Playgroud)
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Run Code Online (Sandbox Code Playgroud)

inital分布

现在让我们重新分区我们的数据集:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Run Code Online (Sandbox Code Playgroud)

由于传递的参数HashPartitioner定义了多个分区,我们期望一个分区:

rddOneP.partitions.length
Run Code Online (Sandbox Code Playgroud)
Int = 1
Run Code Online (Sandbox Code Playgroud)

由于我们只有一个分区,因此它包含所有元素:

countByPartition(rddOneP).collect
Run Code Online (Sandbox Code Playgroud)
Array[Int] = Array(6)
Run Code Online (Sandbox Code Playgroud)

散列分割-1

请注意,shuffle之后的值的顺序是不确定的.

如果我们使用相同的方式 HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
Run Code Online (Sandbox Code Playgroud)

我们将获得2个分区:

rddTwoP.partitions.length
Run Code Online (Sandbox Code Playgroud)
Int = 2
Run Code Online (Sandbox Code Playgroud)

由于rdd按键数据分区将不再均匀分布:

countByPartition(rddTwoP).collect()
Run Code Online (Sandbox Code Playgroud)
Array[Int] = Array(2, 4)
Run Code Online (Sandbox Code Playgroud)

因为有三个键只有两个不同的hashCodemod 值,numPartitions所以这里没有任何意外:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
Run Code Online (Sandbox Code Playgroud)
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Run Code Online (Sandbox Code Playgroud)

只是为了确认以上内容:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Run Code Online (Sandbox Code Playgroud)
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Run Code Online (Sandbox Code Playgroud)

散列分割-2-

最后HashPartitioner(7)我们得到七个分区,三个非空,每个分区有2个元素:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Run Code Online (Sandbox Code Playgroud)
Int = 7
Run Code Online (Sandbox Code Playgroud)
countByPartition(rddTenP).collect()
Run Code Online (Sandbox Code Playgroud)
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
Run Code Online (Sandbox Code Playgroud)

散列分割-7-

摘要和说明

  • HashPartitioner 采用一个定义分区数的参数
  • 使用hash键将值分配给分区.hash函数可能因语言而异(Scala RDD可能会使用hashCode,DataSets使用MurmurHash 3,PySpark,portable_hash).

    在这种简单的情况下,key是一个小整数,你可以假设它hash是一个identity(i = hash(i)).

    Scala API用于nonNegativeMod根据计算的哈希确定分区,

  • 如果密钥的分配不均匀,则可能会在群集的一部分处于空闲状态时结束

  • 钥匙必须是可清洗的.你可以查看我对A列表的答案作为PySpark的reduceByKey的关键,以了解有关PySpark特定问题的内容.HashPartitioner文档强调了另一个可能的问题:

    Java数组具有基于数组的身份而不是其内容的hashCode,因此尝试使用HashPartitioner 对RDD [Array [ ]]或RDD [(Array [ ],_)]进行分区将产生意外或不正确的结果.

  • 在Python 3中,您必须确保散列是一致的.请参阅什么是例外:字符串哈希的随机性应通过pyspark中的PYTHONHASHSEED平均值禁用?

  • 散列分区既不是单射的也不是满足的.可以将多个密钥分配给单个分区,并且某些分区可以保持为空.

  • 请注意,当与REPL定义的案例类(Apache Spark中的Case类相等)结合使用时,当前基于散列的方法在Scala中不起作用.

  • HashPartitioner(或任何其他Partitioner)洗牌数据.除非在多个操作之间重用分区,否则它不会减少要洗牌的数据量.


aba*_*rek 6

RDD分布,这意味着将其拆分为一定数量的零件。每个分区都可能位于不同的计算机上。带有参数的哈希分区器通过以下方式numPartitions选择要放置对的分区(key, value)

  1. 精确创建numPartitions分区。
  2. 地方(key, value)与数分区Hash(key) % numPartitions


归档时间:

查看次数:

30494 次

最近记录:

6 年,11 月 前