我正在使用 Apache Spark ML LSH 的 approxSimilarityJoin 方法加入 2 个数据集,但我看到了一些奇怪的行为。
在(内部)连接之后,数据集有点偏斜,但是每次完成一个或多个任务都需要花费过多的时间。
正如您所看到的,每个任务的中位数是 6 毫秒(我在较小的源数据集上运行它进行测试),但 1 个任务需要 10 分钟。它几乎不使用任何 CPU 周期,它实际上连接了数据,但是速度太慢了。下一个最慢的任务在 14 秒内运行,有 4 倍多的记录并且实际上溢出到磁盘。
连接本身是根据 minhash 规范和 udf 计算匹配对之间的 jaccard 距离的 pos & hashValue (minhash) 上的两个数据集之间的内部连接。
分解哈希表:
modelDataset.select(
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Run Code Online (Sandbox Code Playgroud)
杰卡德距离函数:
modelDataset.select(
struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Run Code Online (Sandbox Code Playgroud)
连接处理过的数据集:
override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
val xSet = x.toSparse.indices.toSet
val ySet = y.toSparse.indices.toSet
val intersectionSize = xSet.intersect(ySet).size.toDouble
val unionSize = xSet.size + ySet.size - intersectionSize
assert(unionSize > 0, "The …Run Code Online (Sandbox Code Playgroud) 我正在尝试将 BucketedRandomProjectionLSH 的函数model.approxNearestNeighbors(df, key, n)应用于数据帧的所有行,以便大约找到每个项目的前 n 个最相似的项目。我的数据框有 100 万行。
我的问题是我必须找到一种方法在合理的时间内(不超过2小时)计算它。我已经读过该函数,approxSimilarityJoin(df, df, threshold)但该函数花费的时间太长,并且没有返回正确的行数:如果我的数据帧有 100.000 行,并且我设置了一个非常高/宽松的阈值,我会得到甚至不到 10% 的结果返回的行数。
因此,我正在考虑approxNearestNeighbors在所有行上使用,以便计算时间几乎是线性的。
如何将该函数应用于数据帧的每一行?我无法使用 UDF,因为我需要模型 + 数据帧作为输入。
你有什么建议吗 ?
我使用approxSimilarityJoin来找到两组之间的Jaccard相似性.
val dfA = hashtoseq.toDF("id","values") //values is a set of string
val hashingTF = new HashingTF().setInputCol("values").setOutputCol("features").setNumFeatures(1048576)
val featurizedData = hashingTF.transform(dfA)
val mh = new MinHashLSH()
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
val model = mh.fit(featurizedData)
val dffilter = model.approxSimilarityJoin(featurizedData, featurizedData, 0.45)
Run Code Online (Sandbox Code Playgroud)
我正在为16 GB数据集写入大约270 GB的内容,甚至在服务器上也需要超过3小时(3个工作节点,每个节点有64 GB RAM和64个内核).
我经历了以下链接: -
[ LSH Spark永远停留在approxSimilarityJoin()函数,但它对我不起作用.
我也经历过databricks网站,他们将运行时与数据大小进行了比较.对于MB中的数据,即436 MB,大约相似度需要25分钟.对于GB中的数据集,它的创建问题.[ https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html].
我们可以通过在代码/服务器配置中进行一些更改或者使用approxSimilarityJoin函数来减少这种随机写入吗?还有其他有效的方法可以在大型数据集上计算Jaccard相似度吗?
我有很多用户,每个用户都有一个关联的向量。我想计算每个用户之间的余弦相似度。从尺寸来看,这是令人望而却步的。看起来 LSH 是一个很好的近似步骤,据我所知,它将创建存储桶,在这种情况下,用户被映射到同一个存储桶,其中它们很可能是相似的。在 Pyspark 中,示例如下:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.dense([1.0, 1.0]),),
(1, Vectors.dense([1.0, -1.0]),),
(4, Vectors.dense([1.0, -1.0]),),
(5, Vectors.dense([1.1, -1.0]),),
(2, Vectors.dense([-1.0, -1.0]),),
(3, Vectors.dense([-1.0, 1.0]),)]
dfA = ss.createDataFrame(dataA, ["id", "features"])
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=1.0, numHashTables=3)
model = brp.fit(dfA)
model.transform(dfA).show(truncate=False)
+---+-----------+-----------------------+
|id |features |hashes |
+---+-----------+-----------------------+
|0 |[1.0,1.0] |[[-1.0], [0.0], [-1.0]]|
|1 |[1.0,-1.0] |[[-2.0], [-2.0], [1.0]]|
|4 |[1.0,-1.0] |[[-2.0], [-2.0], [1.0]]|
|5 |[1.1,-1.0] |[[-2.0], [-2.0], [1.0]]|
|2 |[-1.0,-1.0]|[[0.0], …Run Code Online (Sandbox Code Playgroud)