chr*_*btk 5 reduce scala apache-spark
我正在使用带scala的spark,我有一个充满了tuple2的RDD,其中包含一个复杂的对象作为键和一个double.目标是在对象相同时添加双精度(频率).
为此,我已将我的对象定义如下:
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
def compare(that: SimpleCoocurrence) = {
if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
&&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
0
else
this.toString.compareTo(that.toString)
}
}
Run Code Online (Sandbox Code Playgroud)
现在我正在尝试使用reduceBykey:
val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)
Run Code Online (Sandbox Code Playgroud)
但是,结果显示处理reducebykey之前和之后的RDD包含完全相同数量的元素.
如何使用tuple2 [SimpleCoocurrence,Double]执行reduceByKey?实现Ordered trait是告诉Spark如何比较我的对象的好方法吗?我应该只使用tuple2 [String,Double]吗?
谢谢,
reduceByKey不使用排序,但hashCode并equals确定哪些键是相同的.特别是,hashPartitioner通过散列对组密钥进行分组,使用相同hashCode的密钥落在同一分区上,以便在每个分区上进一步减少.
case类有一个默认的实现equals和hashCode.可能使用的测试数据具有不同的字段值,distance:Double使每个实例成为唯一对象.将其用作密钥将导致仅将相同的对象缩减为一个.
解决这个问题的一种方法是为您定义一个键case class和一个对象的加法方法,如下所示:
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
val key = word + word_pos + cooc + cooc_pos
}
object SimpleCoocurrence {
val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
}
val coocList:List[SimpleCoocurrence] = ???
val coocRDD = sc.parallelize(coocList)
val coocByKey = coocRDD.keyBy(_.key)
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)
Run Code Online (Sandbox Code Playgroud)
(*)作为指导示例提供的代码 - 未编译或测试.
| 归档时间: |
|
| 查看次数: |
3376 次 |
| 最近记录: |