Ker*_*ial 4 scala recommendation-engine indexoutofboundsexception apache-flink flinkml
我正在为电影做一个推荐系统,使用这里提供的MovieLens数据集:http: //grouplens.org/datasets/movielens/
为了计算这个推荐系统,我在scala中使用了Flink的ML库,特别是ALS算法(org.apache.flink.ml.recommendation.ALS).
我首先将电影的评级映射为a DataSet[(Int, Int, Double)]然后创建a trainingSet和a testSet(参见下面的代码).
我的问题是当我使用ALS.fit整个数据集的函数(所有评级)时没有错误,但是如果我只删除一个评级,则fit函数不再起作用,我不明白为什么.
你有什么想法?:)
使用的代码:
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
Run Code Online (Sandbox Code Playgroud)
PreProcessing.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
Run Code Online (Sandbox Code Playgroud)
Processing.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}
Run Code Online (Sandbox Code Playgroud)
"但如果我只删除一个评级"
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)
Run Code Online (Sandbox Code Playgroud)
错误 :
06/19/2015 15:00:24 CoGroup(CoGroup at org.apache.flink.ml.recommendation.ALS $ .updateFactors(ALS.scala:570))(4/4)切换到FAILED
java.lang.ArrayIndexOutOfBoundsException:5
在org.apache.flink.ml.recommendation.ALS $ BlockRating.apply(ALS.scala:358)
在org.apache.flink.ml.recommendation.ALS $$ anon $ 111.coGroup(ALS.scala:635)
在org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
...
问题是first运算符结合setTemporaryPathFlink ALS实现的参数.为了理解这个问题,让我快速解释阻塞ALS算法的工作原理.
交替最小二乘的阻塞实现首先将给定的评级矩阵按用户方式和逐项划分为块.对于这些块,计算路由信息.该路由信息分别表示哪个用户/项块接收来自哪个项/用户块的输入.然后,启动ALS迭代.
由于Flink的底层执行引擎是并行流数据流引擎,因此它尝试以流水线方式执行尽可能多的数据流部分.这需要同时在线管道的所有操作员.这具有以下优点:Flink避免实现中间结果,这可能非常大.缺点是必须在所有正在运行的运营商之间共享可用内存.在ALS的情况下,其中各个DataSet元素(例如,用户/项块)的大小相当大,这是不期望的.
为了解决这个问题,如果你设置了一个,并不是所有的实现操作符都会同时执行temporaryPath.路径定义了中间结果的存储位置.因此,如果您已定义了临时路径,则ALS首先计算用户块的路由信息并将其写入磁盘,然后计算项块的路由信息并将其写入磁盘,最后但并非最不重要的是它启动ALS迭代,它从临时路径读取路由信息.
用户和项块的路由信息的计算都取决于给定的评级数据集.在您计算用户路由信息的情况下,它将首先读取评级数据集并first在其上应用运算符.的first运算符返回n从基础数据集-arbitrary元素.现在的问题是Flink不会存储此first操作的结果来计算项目路由信息.相反,当您开始计算项目路由信息时,Flink将从其源开始重新执行数据流.这意味着它从磁盘读取评级数据集并first再次对其应用运算符.在许多情况下,与第一次first操作的结果相比,这将为您提供不同的评级.因此,生成的路由信息不一致并ALS失败.
您可以通过实现first运算符的结果来避开问题,并将此结果用作ALS算法的输入.该对象FlinkMLTools包含一个方法persist,该方法接受a DataSet,将其写入给定路径,然后返回一个DataSet读取刚写入的新内容DataSet.这允许您分解结果数据流图.
val firstTrainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.first((ratings.count()-1).toInt)
val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS/")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
Run Code Online (Sandbox Code Playgroud)
或者,您可以尝试离开未temporaryPath设置.然后以流水线方式执行所有步骤(路由信息计算和als迭代).这意味着用户和项目路由信息计算都使用由first运营商产生的相同输入数据集.
Flink社区目前正致力于将运营商的中间结果保存在内存中.这将允许固定first操作员的结果,使得它不会被计算两次,因此由于其不确定性而不会给出不同的结果.
| 归档时间: |
|
| 查看次数: |
296 次 |
| 最近记录: |