Dat*_*oli 15 libsvm apache-spark apache-spark-sql apache-spark-ml apache-spark-mllib
我想制作libsvm格式,所以我将数据帧设置为所需的格式,但我不知道如何转换为libsvm格式.格式如图所示.我希望所需的libsvm类型是用户项:rating.如果您知道在当前情况下该怎么做:
val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
val fields = line.split(",")
(fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey
val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}
val data_DF = data.toDF("user","item","rating")
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark 2.0.
eli*_*sah 15
您面临的问题可分为以下几个方面:
LabeledPoint数据X.1.将您的评级转换为LabeledPoint数据X.
让我们考虑以下原始评级:
val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
Run Code Online (Sandbox Code Playgroud)
您可以将这些原始评级作为坐标列表矩阵(COO)处理.
Spark实现了由其条目的RDD支持的分布式矩阵:CoordinateMatrix其中每个条目是(i:Long,j:Long,value:Double)的元组.
注意:只有当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用CoordinateMatrix.(通常是用户/项目评级的情况.)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val data: RDD[MatrixEntry] =
sc.parallelize(rawRatings).map {
line => {
val fields = line.split(",")
val i = fields(0).toLong
val j = fields(1).toLong
val value = fields(2).toDouble
MatrixEntry(i, j, value)
}
}
Run Code Online (Sandbox Code Playgroud)
现在让我们将其转换RDD[MatrixEntry]为a CoordinateMatrix并提取索引行:
val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
.toIndexedRowMatrix().rows // Extract indexed rows
.toDF("label", "features") // Convert rows
Run Code Online (Sandbox Code Playgroud)
2.以libsvm格式保存LabeledPoint数据
从Spark 2.0开始,您可以使用DataFrameWriter.让我们创建一个带有一些虚拟LabeledPoint数据的小例子(您也可以使用DataFrame我们之前创建的):
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
Run Code Online (Sandbox Code Playgroud)
遗憾的是我们仍然不能DataFrameWriter直接使用它,因为虽然大多数管道组件支持向后兼容加载,但2.0之前的Spark版本中包含向量或矩阵列的一些现有DataFrame和管道可能需要迁移到新的spark.ml矢量和矩阵类型.
可以在以下情况中找到用于将DataFrame列转换mllib.linalg为ml.linalg类型(以及反之亦然)的实用程序.org.apache.spark.mllib.util.MLUtils.在我们的示例中,我们需要执行以下操作(对于虚拟数据和DataFramefrom step 1.)
import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)
Run Code Online (Sandbox Code Playgroud)
现在让我们保存DataFrame:
convertedVecDF.write.format("libsvm").save("data/foo")
Run Code Online (Sandbox Code Playgroud)
我们可以查看文件内容:
$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0
Run Code Online (Sandbox Code Playgroud)
编辑:在当前版本的spark(2.1.0)中,不需要使用mllib包.您可以简单地LabeledPoint以libsvm格式保存数据,如下所示:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
17334 次 |
| 最近记录: |