我想使用RDD对Tuple2<byte[], obj>,但byte[]具有相同内容的s被视为不同的值,因为它们的参考值是不同的.
我没有看到任何传递自定义比较器.我可以使用明确的字符集将其转换byte[]为a String,但我想知道是否有更有效的方法.
自定义比较器不足,因为Spark使用hashCode对象组织分区中的键.(至少HashPartitioner会这样做,你可以提供一个可以处理数组的自定义分区器)
包装的阵列提供适当的equals和hashCode应该解决的问题.一个轻量级的包装应该可以做到这一点:
class SerByteArr(val bytes: Array[Byte]) extends Serializable {
override val hashCode = bytes.deep.hashCode
override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}
Run Code Online (Sandbox Code Playgroud)
快速测试:
import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.
// let's use the wrapper instead
val keyable = rdd.map(elem => new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1266 次 |
| 最近记录: |