标签: lsh

奇怪的性能问题 Spark LSH MinHash approxSimilarityJoin

我正在使用 Apache Spark ML LSH 的 approxSimilarityJoin 方法加入 2 个数据集,但我看到了一些奇怪的行为。

在(内部)连接之后,数据集有点偏斜,但是每次完成一个或多个任务都需要花费过多的时间。

sparkui-1

正如您所看到的,每个任务的中位数是 6 毫秒(我在较小的源数据集上运行它进行测试),但 1 个任务需要 10 分钟。它几乎不使用任何 CPU 周期,它实际上连接了数据,但是速度太慢了。下一个最慢的任务在 14 秒内运行,有 4 倍多的记录并且实际上溢出到磁盘。

如果你看 sparkuisql

连接本身是根据 minhash 规范和 udf 计算匹配对之间的 jaccard 距离的 pos & hashValue (minhash) 上的两个数据集之间的内部连接。

分解哈希表:

modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Run Code Online (Sandbox Code Playgroud)

杰卡德距离函数:

modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Run Code Online (Sandbox Code Playgroud)

连接处理过的数据集:

 override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
    val xSet = x.toSparse.indices.toSet
    val ySet = y.toSparse.indices.toSet
    val intersectionSize = xSet.intersect(ySet).size.toDouble
    val unionSize = xSet.size + ySet.size - intersectionSize
    assert(unionSize > 0, "The …
Run Code Online (Sandbox Code Playgroud)

duplicates minhash apache-spark apache-spark-mllib lsh

6
推荐指数
1
解决办法
1001
查看次数

在数据帧的所有行上应用 LSH approxNearestNeighbors

我正在尝试将 BucketedRandomProjectionLSH 的函数model.approxNearestNeighbors(df, key, n)应用于数据帧的所有行,以便大约找到每个项目的前 n 个最相似的项目。我的数据框有 100 万行。

我的问题是我必须找到一种方法在合理的时间内(不超过2小时)计算它。我已经读过该函数,approxSimilarityJoin(df, df, threshold)但该函数花费的时间太长,并且没有返回正确的行数:如果我的数据帧有 100.000 行,并且我设置了一个非常高/宽松的阈值,我会得到甚至不到 10% 的结果返回的行数。

因此,我正在考虑approxNearestNeighbors在所有行上使用,以便计算时间几乎是线性的。

如何将该函数应用于数据帧的每一行?我无法使用 UDF,因为我需要模型 + 数据帧作为输入。

你有什么建议吗 ?

python apache-spark pyspark lsh

6
推荐指数
0
解决办法
552
查看次数

如何减少由spark中的approxSimilarityJoin引起的shuffle写入?

我使用approxSimilarityJoin来找到两组之间的Jaccard相似性.

 val dfA = hashtoseq.toDF("id","values") //values is a set of string
 val hashingTF = new HashingTF().setInputCol("values").setOutputCol("features").setNumFeatures(1048576)
 val featurizedData = hashingTF.transform(dfA)


 val mh = new MinHashLSH()
             .setNumHashTables(3)
             .setInputCol("features")
             .setOutputCol("hashes")

val model = mh.fit(featurizedData)

val dffilter = model.approxSimilarityJoin(featurizedData, featurizedData, 0.45)
Run Code Online (Sandbox Code Playgroud)

我正在为16 GB数据集写入大约270 GB的内容,甚至在服务器上也需要超过3小时(3个工作节点,每个节点有64 GB RAM和64个内核).

我经历了以下链接: -
[ LSH Spark永远停留在approxSimilarityJoin()函数,但它对我不起作用.

我也经历过databricks网站,他们将运行时与数据大小进行了比较.对于MB中的数据,即436 MB,大约相似度需要25分钟.对于GB中的数据集,它的创建问题.[ https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html].

我们可以通过在代码/服务器配置中进行一些更改或者使用approxSimilarityJoin函数来减少这种随机写入吗?还有其他有效的方法可以在大型数据集上计算Jaccard相似度吗?

scala apache-spark lsh

5
推荐指数
0
解决办法
442
查看次数

Pyspark LSH 后跟余弦相似度

我有很多用户,每个用户都有一个关联的向量。我想计算每个用户之间的余弦相似度。从尺寸来看,这是令人望而却步的。看起来 LSH 是一个很好的近似步骤,据我所知,它将创建存储桶,在这种情况下,用户被映射到同一个存储桶,其中它们很可能是相似的。在 Pyspark 中,示例如下:

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (4, Vectors.dense([1.0, -1.0]),),
         (5, Vectors.dense([1.1, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = ss.createDataFrame(dataA, ["id", "features"])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=1.0, numHashTables=3)
model = brp.fit(dfA)
model.transform(dfA).show(truncate=False)


+---+-----------+-----------------------+
|id |features   |hashes                 |
+---+-----------+-----------------------+
|0  |[1.0,1.0]  |[[-1.0], [0.0], [-1.0]]|
|1  |[1.0,-1.0] |[[-2.0], [-2.0], [1.0]]|
|4  |[1.0,-1.0] |[[-2.0], [-2.0], [1.0]]|
|5  |[1.1,-1.0] |[[-2.0], [-2.0], [1.0]]|
|2  |[-1.0,-1.0]|[[0.0], …
Run Code Online (Sandbox Code Playgroud)

nearest-neighbor apache-spark pyspark lsh

5
推荐指数
1
解决办法
1100
查看次数