Dan*_*nov 6 scala cross-join apache-spark
我有一个需要计算基于电影内容的相似性的Spark工作.有46k电影.每部电影由一组SparseVectors表示(每个矢量是电影场之一的特征向量,例如Title,Plot,Genres,Actors等).例如,对于Actors和Genres,向量显示给定的actor是否在电影中存在(1)或不存在(0).
任务是为每部电影找到前10个类似的电影.我设法在Scala中编写一个脚本来执行所有这些计算并完成工作.它适用于较小的电影集,如1000部电影,但不适用于整个数据集(内存不足等).
我进行此计算的方法是在电影数据集上使用交叉连接.然后通过仅获取movie1_id <movie2_id的行来减少问题.此时数据集仍然包含46000 ^ 2/2行,即1058000000.每行都有大量数据.
然后我计算每一行的相似度得分.在计算相似度之后,我将movie1_id相同的结果分组,并按照相似性得分的降序对它们进行排序,使用前N个项目的Window函数(类似于此处描述的方式:Spark获得每个项目的前N个最高得分结果(item1,item2) ,得分)).
问题是 - 它可以在Spark中更有效地完成吗?例如,无需执行crossJoin?
还有一个问题--Spark如何处理如此庞大的数据帧(1058000000行由多个SparseVectors组成)?是否必须一次将所有这些保留在内存中?或者它是否以某种方式逐个处理这样的数据帧?
我正在使用以下函数来计算电影矢量之间的相似性:
def intersectionCosine(movie1Vec: SparseVector, movie2Vec: SparseVector): Double = {
val a: BSV[Double] = toBreeze(movie1Vec)
val b: BSV[Double] = toBreeze(movie2Vec)
var dot: Double = 0
var offset: Int = 0
while( offset < a.activeSize) {
val index: Int = a.indexAt(offset)
val value: Double = a.valueAt(offset)
dot += value * b(index)
offset += 1
}
val bReduced: BSV[Double] = new BSV(a.index, a.index.map(i => b(i)), a.index.length)
val maga: Double = magnitude(a)
val magb: Double = magnitude(bReduced)
if (maga == 0 || magb == 0)
return 0
else
return dot / (maga * magb)
}
Run Code Online (Sandbox Code Playgroud)
Dataframe中的每一行都包含两个连接的类:
final case class MovieVecData(imdbID: Int,
Title: SparseVector,
Decade: SparseVector,
Plot: SparseVector,
Genres: SparseVector,
Actors: SparseVector,
Countries: SparseVector,
Writers: SparseVector,
Directors: SparseVector,
Productions: SparseVector,
Rating: Double
)
Run Code Online (Sandbox Code Playgroud)
只要你对近似值很好,并且不需要精确的结果(或确切的数字或结果),它就可以更有效地完成.
与我在Apache Spark中对高效字符串匹配的回答类似,您可以使用LSH,其中:
BucketedRandomProjectionLSH
接近欧氏距离.MinHashLSH
近似Jaccard距离.如果要素空间很小(或可以合理地减少)并且每个类别相对较小,您还可以手动优化代码:
explode
feature数组,用于从单个记录生成#features记录.一个最小的例子是(认为它是一个伪代码):
import org.apache.spark.ml.linalg._
// This is oversimplified. In practice don't assume only sparse scenario
val indices = udf((v: SparseVector) => v.indices)
val df = Seq(
(1L, Vectors.sparse(1024, Array(1, 3, 5), Array(1.0, 1.0, 1.0))),
(2L, Vectors.sparse(1024, Array(3, 8, 12), Array(1.0, 1.0, 1.0))),
(3L, Vectors.sparse(1024, Array(3, 5), Array(1.0, 1.0))),
(4L, Vectors.sparse(1024, Array(11, 21), Array(1.0, 1.0))),
(5L, Vectors.sparse(1024, Array(21, 32), Array(1.0, 1.0)))
).toDF("id", "features")
val possibleMatches = df
.withColumn("key", explode(indices($"features")))
.transform(df => df.alias("left").join(df.alias("right"), Seq("key")))
val closeEnough(threshold: Double) = udf((v1: SparseVector, v2: SparseVector) => intersectionCosine(v1, v2) > threshold)
possilbeMatches.filter(closeEnough($"left.features", $"right.features")).select($"left.id", $"right.id").distinct
Run Code Online (Sandbox Code Playgroud)
请注意,只有当散列/特征具有足够的选择性(并且最佳稀疏)时,这两种解决方案才值得开销.在上面显示的示例中,您只比较集合{1,2,3}和{4,5}中的行,而不是集合之间的行.
然而,在最坏的情况下(M记录,N个特征),我们可以进行NM 2比较,而不是M 2
归档时间: |
|
查看次数: |
1735 次 |
最近记录: |