来自数据帧的Spark MLLib Kmeans,又回来了

Mic*_*zer 9 k-means apache-spark

我的目标是使用Spark(1.3.1)MLLib将kmeans聚类算法应用于非常大的数据集.我使用来自Spark的hiveContext从HDFS调用数据,并最终希望将其放回那里 - 以这种格式

    |I.D     |cluster |
    ===================
    |546     |2       |
    |6534    |4       |
    |236     |5       |
    |875     |2       |
Run Code Online (Sandbox Code Playgroud)

我运行了以下代码,其中"data"是双精度数据帧,第一列是ID.

    val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
    val clusters = KMeans.train(parsedData, 3, 20)
Run Code Online (Sandbox Code Playgroud)

这成功运行,我现在停留在如上所述的数据帧中将群集映射回各自的ID.我可以将其转换为数据帧:

    sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
Run Code Online (Sandbox Code Playgroud)

但就我而言,这就是我的意思.这篇文章是在正确的轨道上,我认为这篇文章提出了类似的问题.

我怀疑需要带标签的点库.任何评论,答案将不胜感激,欢呼.

编辑:刚刚在Spark用户列表中找到这个,看起来很有希望

krc*_*rcz 10

我知道你想在最后获得DataFrame.我看到两种可能的解决方案 我会说他们之间的选择是品味问题.

从RDD创建列

以RDD的形式获取成对的id和簇非常容易:

val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
Run Code Online (Sandbox Code Playgroud)

然后从中创建DataFrame

val idCluster = idClusterRDD.toDF("id", "cluster")
Run Code Online (Sandbox Code Playgroud)

它的工作原理是因为map不会改变RDD中数据的顺序,这就是为什么你只能用预测结果压缩id.

使用UDF(用户定义函数)

第二种方法涉及使用clusters.predict方法作为UDF:

val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
    bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)
Run Code Online (Sandbox Code Playgroud)

现在我们可以使用它来为数据添加预测:

val idCluster = data.selectExpr("id", "predict(x, y) as cluster")
Run Code Online (Sandbox Code Playgroud)

请记住,Spark API不允许UDF注销.这意味着闭包数据将保存在内存中.

错误/不理想的解决方案

  • 在没有广播的情况下使用clusters.predict

它不适用于分布式设置.编辑:实际上它会工作,我使用广播的RDD的预测实施感到困惑.

  • sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

toArray收集驱动程序中的所有数据.这意味着在分布式模式下,您将集群ID复制到一个节点中.