加快Spark MLLib中大型数据集的协同过滤

Rai*_*eld 6 scala collaborative-filtering apache-spark apache-spark-mllib

我正在使用MLlib的矩阵分解来向用户推荐项目.我有一个很大的隐式交互矩阵,M = 2000万用户和N = 50k项目.在训练模型之后,我想获得每个用户的推荐列表(例如200).我尝试过recommendProductsForUsers,MatrixFactorizationModel但它非常慢(跑了9个小时,但距离完成还很远.我正在测试50个执行器,每个都有8g内存).这可能是预期的,因为recommendProductsForUsers需要计算所有M*N用户项目交互并获得每个用户的顶部.

我会尝试使用更多的执行程序,但是从我在Spark UI上的应用程序细节中看到的,我怀疑它可以在几小时或一天完成,即使我有1000个执行程序(9小时之后它仍然在flatmap这里https:// github. com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L279-L289,10000个总任务,只有~200个完成)还有其他的东西除了增加执行者数量之外,我可以调整以加快推荐过程吗?

这是示例代码:

val data = input.map(r => Rating(r.getString(0).toInt, r.getString(1).toInt, r.getLong(2))).cache
val rank = 20
val alpha = 40
val maxIter = 10
val lambda = 0.05
val checkpointIterval = 5
val als = new ALS()
    .setImplicitPrefs(true)
    .setCheckpointInterval(checkpointIterval)
    .setRank(rank)
    .setAlpha(alpha)
    .setIterations(maxIter)
    .setLambda(lambda)
val model = als.run(ratings)
val recommendations = model.recommendProductsForUsers(200)
recommendations.saveAsTextFile(outdir)
Run Code Online (Sandbox Code Playgroud)

Sha*_*dow 1

@Jack Lei:你找到这个问题的答案了吗?我自己尝试了一些事情,但只起到了一点作用。

例如:我尝试过

javaSparkContext.setCheckpointDir("checkpoint/");
Run Code Online (Sandbox Code Playgroud)

这很有帮助,因为它避免了中间的重复计算。

还尝试为每个执行程序添加更多内存和开销火花内存

--conf spark.driver.maxResultSize=5g --conf spark.yarn.executor.memoryOverhead=4000
Run Code Online (Sandbox Code Playgroud)