Vis*_*667 4 scala nearest-neighbor apache-spark
我正在尝试使用spark执行K最近邻搜索.
我有一个RDD [Seq [Double]],我正在计划用实际的行和邻居列表返回RDD [(Seq [Double],Seq [Seq [Double]])]
val out = data.map(row => {
val neighbours = data.top(num = 3)(new Ordering[Seq[Double]] {
override def compare(a:Seq[Double],b:Seq[Double]) = {
euclideanDistance(a,row).compare(euclideanDistance(b,row))*(-1)
}
})
(row,neighbours.toSeq)
})
Run Code Online (Sandbox Code Playgroud)
并且它在spark提交时给出以下错误
15/04/29 21:15:39 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 192.168.1.7): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
Run Code Online (Sandbox Code Playgroud)
我知道嵌套RDD是不可能的,但我如何执行这样的操作,我可以将RDD中的每个元素与RDD中的每个其他元素进行比较
这样的事情应该做到这一点.
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sco= new SparkContext(conf)
// k is the number of nearest neighbors required
val k = 3
// generate 5 rows of two-dimensional coordinates
val rows = List.fill(5)(List.fill(2)(Math.random))
val dataRDD = sco.parallelize(rows, 1)
// No need for the sqrt as we're just comparing them
def euclidean(a:List[Double], b:List[Double]) =
(a zip b) map {case (x:Double, y:Double) => (x-y)*(x-y)} sum
// get all pairs
val pairs = dataRDD.cartesian(dataRDD)
// case class to keep things a bit neater
// the neighbor, and its distance from the current point
case class Entry(neighbor: List[Double], dist:Double)
// map the second element to the element and distance from the first
val pairsWithDist = pairs.map {case (x, y) => (x, Entry(y, euclidean(x,y)))}
// merge a row of pairsWithDist with the ResultRow for this point
def mergeOne(u: List[Entry], v:Entry) = (v::u).sortBy{_.dist}.take(k)
// merge two results from different partitions
def mergeList(u: List[Entry], v:List[Entry]) = (u:::v).sortBy{_.dist}.take(k)
val nearestNeighbors = pairsWithDist
.aggregateByKey(List[Entry]())(mergeOne, mergeList)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4998 次 |
| 最近记录: |