gsa*_*ras 2 scala mapreduce machine-learning distributed-computing apache-spark
这是从一个跟进的问题在这里.我正在尝试基于此实现实现k-means .它的伟大工程,但我想换成groupByKey()用reduceByKey(),但我不知道如何(我并不担心现在的表现).这是相关的缩小代码:
val data = sc.textFile("dense.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val read_mean_centroids = sc.textFile("centroids.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
..
Run Code Online (Sandbox Code Playgroud)
请注意,这println(newCentroids)将给出:
地图(23 - >( - 6.269305E-4,-0.0011746404,-4.08004E-5),8 - >( - 5.108732E-4,7.336348E-4,-3.707591E-4),17 - >( - 0.0016383086 ,-0.0016974678,1.45 ..
并且println(closest):
MapPartitionsRDD [6]在地图kmeans.scala:75
相关问题:在Apache Spark(Scala)中使用reduceByKey.
一些文件:
def reduceByKey(func:(V,V)⇒V):RDD [(K,V)]
使用关联reduce函数合并每个键的值.
def reduceByKey(func:(V,V)⇒V,numPartitions:Int):RDD [(K,V)]
使用关联reduce函数合并每个键的值.
def reduceByKey(分区器:分区器,功能:(V,V)⇒V):RDD [(K,V)]
使用关联reduce函数合并每个键的值.
def groupByKey():RDD [(K,Iterable [V])]
将RDD中每个键的值分组为单个序列.
您可以使用aggregateByKey()(比reduceByKey()这更自然)这样计算newCentroids:
val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
(agg, v) => (agg._1 += v, agg._2 + 1L),
(agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2)
).mapValues(agg => agg._1/agg._2).collectAsMap
Run Code Online (Sandbox Code Playgroud)
为此,您需要计算数据的维度,即dim,您只需要执行一次.你可以使用类似的东西val dim = data.first._2.length.
| 归档时间: |
|
| 查看次数: |
1222 次 |
| 最近记录: |