Pyspark:计算向量列的余弦相似度的最快方法是什么

whs*_*s2k 6 python dataframe apache-spark apache-spark-sql pyspark

Pyspark 初学者问题在这里!我有一个大约 2M 行已矢量化文本的数据框(通过 w2v;300 维)。计算每行相对于新的单向量输入的余弦距离的最有效方法是什么?

我当前的方法使用 udf 并需要几分钟,对于我想要创建的 web 应用程序来说太长了。

创建样本 df:

import numpy as np
import pandas as pd
from pyspark.sql.functions import *

column=[]
num_rows = 10000 #change to 2000000 to really slow your computer down!
for x in range(num_rows):
    sample = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
    column.append(sample)
index = range(1000)
df_pd = pd.DataFrame([index, column]).T
#df_pd = pd.concat([df.T[x] for x in df.T], ignore_index=True)
df_pd.head()
df = spark.createDataFrame(df_pd).withColumnRenamed('0', 'Index').withColumnRenamed('1', 'Vectors')
df.show()
Run Code Online (Sandbox Code Playgroud)

创建一个示例输入(我将其创建为 Spark df,以便通过现有管道进行转换):

new_input = np.random.uniform(low=-1, high=1, size=(300,)).tolist()
df_pd_new = pd.DataFrame([[new_input]])
df_new = spark.createDataFrame(df_pd_new, ['Input_Vector'])
df_new.show()
Run Code Online (Sandbox Code Playgroud)

计算向量和 new_input 之间的余弦距离或相似度:

value = df_new.select('Input_Vector').collect()[0][0]
def cos_sim(vec):
    if (np.linalg.norm(value) * np.linalg.norm(vec)) !=0:
        dot_value = np.dot(value, vec) / (np.linalg.norm(value)*np.linalg.norm(vec))
        return dot_value.tolist()
cos_sim_udf = udf(cos_sim, FloatType())

#df_all_cos = df_all.withColumn('cos_dis', dot_product_udf('w2v')).dropna(subset='cos_dis')
df_cos = df.withColumn('cos_dis', cos_sim_udf('Vectors')).dropna(subset='cos_dis')
df_cos.show()
Run Code Online (Sandbox Code Playgroud)

最后让我们拿出最大 5 个索引来娱乐一下:

max_values = df_cos.select('index','cos_dis').orderBy('cos_dis', ascending=False).limit(5).collect()
top_indicies = []
for x in max_values:
    top_indicies.append(x[0])
print top_indicies
Run Code Online (Sandbox Code Playgroud)

不存在余弦距离的 pyspark 函数(这是理想的),所以我不确定如何加快速度。任何想法都非常感激!

bzu*_*bzu 1

您可以尝试使用 pandas_udf 而不是 udf:

# other imports
from pyspark.sql.pandas.functions import pandas_udf

# make sure arrow is actually used
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")

def cos_sim2(vec: pd.Series) -> pd.Series:
    value_norm = np.linalg.norm(value)
    cs_value = vec.apply(lambda v: np.dot(value, v) / (np.linalg.norm(v) * value_norm))
    return cs_value.replace(np.inf, np.nan)

cos_sim_udf = pandas_udf(cos_sim2, FloatType())
Run Code Online (Sandbox Code Playgroud)