如何在Spark中按键分区?

BAR*_*BAR 10 scala apache-spark rdd

鉴于HashPartitioner文档说:

[HashPartitioner]使用Java的Object.hashCode实现基于散列的分区.

说我想DeviceData通过它进行分区kind.

case class DeviceData(kind: String, time: Long, data: String)
Run Code Online (Sandbox Code Playgroud)

RDD[DeviceData]通过覆盖deviceData.hashCode()方法并仅使用哈希码来分区是否正确kind

但是考虑到HashPartitioner需要一些分区参数,我很困惑我是否需要事先知道种类的数量以及如果有多种类型而不是分区会发生什么?

如果我将分区数据写入磁盘,它会在读取时保持分区吗?

我的目标是致电

  deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)
Run Code Online (Sandbox Code Playgroud)

并且在迭代器中只有DeviceData相同的kind值.

Jus*_*ony 8

如何做一个groupByKey使用kind.或另一种PairRDDFunctions方法.

你让我觉得你并不真正关心分区,只是你在一个处理流程中得到了所有特定的类型?

配对功能允许:

rdd.keyBy(_.kind).partitionBy(new HashPartitioner(PARTITIONS))
   .foreachPartition(...)
Run Code Online (Sandbox Code Playgroud)

但是,您可能会更喜欢更安全的东西:

rdd.keyBy(_.kind).reduceByKey(....)
Run Code Online (Sandbox Code Playgroud)

或者mapValues许多其他一对功能,保证您整体获得作品


zer*_*323 8

通过覆盖deviceData.hashCode()方法并仅使用类型的哈希码对RDD [DeviceData]进行分区是否正确?

它不会.如果您使用Java Object.hashCode文档,您将找到以下有关一般合同的信息hashCode:

如果两个对象根据equals(Object)方法相等,则对两个对象中的每一个调用hashCode方法必须生成相同的整数结果.

因此,除非纯粹基于kind设备的平等概念符合您的用例,并且我严重怀疑它的确如此,修补HashCode所需的分区是一个坏主意.在一般情况下,您应该实现自己的分区程序,但这不是必需的.

因为,除了SQL和GraphX中的特殊方案之外,partitionBy仅在PairRDD创建RDD[(String, DeviceData)]和使用plain 时才有效HashPartitioner

deviceDataRdd.map(dev => (dev.kind, dev)).partitionBy(new HashPartitioner(n))
Run Code Online (Sandbox Code Playgroud)

请记住,在kind基数较低或分布严重偏差的情况下使用它进行分区可能不是最佳解决方案.

  • 没有.这里有一个逻辑谬误 - >"除非纯粹基于某种设备的平等概念适合你的用例,我严重怀疑它,修补HashCode以获得所需的分区是一个坏主意"HashCode基于Kind并不意味着Equality完全基于Kind.就是这样,*如果*2记录是相等的,它们具有相同的种类,并且很容易满足. (5认同)