优化Spark作业,必须为每个条目相似度计算每个条目,并为每个条目输出前N个相似项目

Dan*_*nov 6 scala cross-join apache-spark

我有一个需要计算基于电影内容的相似性的Spark工作.有46k电影.每部电影由一组SparseVectors表示(每个矢量是电影场之一的特征向量,例如Title,Plot,Genres,Actors等).例如,对于Actors和Genres,向量显示给定的actor是否在电影中存在(1)或不存在(0).

任务是为每部电影找到前10个类似的电影.我设法在Scala中编写一个脚本来执行所有这些计算并完成工作.它适用于较小的电影集,如1000部电影,但不适用于整个数据集(内存不足等).

我进行此计算的方法是在电影数据集上使用交叉连接.然后通过仅获取movie1_id <movie2_id的行来减少问题.此时数据集仍然包含46000 ^ 2/2行,即1058000000.每行都有大量数据.

然后我计算每一行的相似度得分.在计算相似度之后,我将movie1_id相同的结果分组,并按照相似性得分的降序对它们进行排序,使用前N个项目的Window函数(类似于此处描述的方式:Spark获得每个项目的前N个最高得分结果(item1,item2) ,得分)).

问题是 - 它可以在Spark中更有效地完成吗?例如,无需执行crossJoin?

还有一个问题--Spark如何处理如此庞大的数据帧(1058000000行由多个SparseVectors组成)?是否必须一次将所有这些保留在内存中?或者它是否以某种方式逐个处理这样的数据帧?


我正在使用以下函数来计算电影矢量之间的相似性:

def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)

var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
  val index: Int = a.indexAt(offset)
  val value: Double = a.valueAt(offset)

  dot += value * b(index)
  offset += 1
}

val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)

if (maga == 0 || magb == 0)
  return 0
else
  return dot / (maga * magb)
}
Run Code Online (Sandbox Code Playgroud)

Dataframe中的每一行都包含两个连接的类:

final case class MovieVecData(imdbID: Int,
                          Title: SparseVector,
                          Decade: SparseVector,
                          Plot: SparseVector,
                          Genres: SparseVector,
                          Actors: SparseVector,
                          Countries: SparseVector,
                          Writers: SparseVector,
                          Directors: SparseVector,
                          Productions: SparseVector,
                          Rating: Double
                         )
Run Code Online (Sandbox Code Playgroud)

hi-*_*zir 7

只要你对近似值很好,并且不需要精确的结果(或确切的数字或结果),它就可以更有效地完成.

与我在Apache Spark中高效字符串匹配的回答类似,您可以使用LSH,其中:

如果要素空间很小(或可以合理地减少)并且每个类别相对较小,您还可以手动优化代码:

  • explode feature数组,用于从单个记录生成#features记录.
  • 按功能,计算距离和过滤掉候选者的自连接结果(当且仅当它们共享特定的分类特征时,才会比较每对记录).
  • 使用当前代码获取最佳记录.

一个最小的例子是(认为它是一个伪代码):

import org.apache.spark.ml.linalg._

// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)

val df = Seq(
  (1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
  (2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
  (3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
  (4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
  (5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")

val possibleMatches = df
  .withColumn("key", explode(indices($"features")))
  .transform(df => df.alias("left").join(df.alias("right"), Seq("key")))

val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) =>  intersectionCosine(v1, v2) > threshold)

possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct
Run Code Online (Sandbox Code Playgroud)

请注意,只有当散列/特征具有足够的选择性(并且最佳稀疏)时,这两种解决方案才值得开销.在上面显示的示例中,您只比较集合{1,2,3}和{4,5}中的行,而不是集合之间的行.

然而,在最坏的情况下(M记录,N个特征),我们可以进行NM 2比较,而不是M 2