计算pyspark中两个数据帧的行之间的距离

J.S*_*J.S 5 python performance apache-spark apache-spark-sql pyspark

我有两个数据框,一个用于用户配置文件,一个用于项目配置文件。

df_client = sqlContext.createDataFrame([('c1',0,1,3),('c2',1,0,3)], ['client_id','col1','col2','col3'])
df_item = sqlContext.createDataFrame([('it1',0,1,3),('it2',1,0,3)], ['item_id','col1','col2','col3'])
Run Code Online (Sandbox Code Playgroud)

我想计算用户和项目之间的余弦相似度,并获得如下的最终数据帧:

df_final.show()
      client_id item_id  distance
0        c1     it1       0
1        c1     it2       0.1
2        c2     it1       0.1
3        c2     it2       0
Run Code Online (Sandbox Code Playgroud)

但实际上有 1100 万用户、150 个项目和 150 个列。因此,我开发了三个解决方案,但每个解决方案都有一个需要一些时间的步骤。

解决方案示例:

list_item= df_item.rdd.collect()

def cosine_distance(v):

    list_item_distance = []

    for row in list_item:
        distance =  round(float(cosine(np.array(v[1:]),np.array(row[1:]))),4)
        list_item_distance.append((v["client_id"],row["item_id"],distance))

    return list_item_distance

rdd_final = df_client.rdd.map(lambda row: cosine_distance(row))

list_final = rdd_final.reduce(lambda x,y: x+y)
Run Code Online (Sandbox Code Playgroud)

但减少时间太长

问题是要得到像数据框这样的结果吗?有人有快速实现这项工作的解决方案吗?

Ass*_*son 1

我要做的第一件事是将列转换为数组。由于没有直接的方法,您可以执行 df.create_map(colnames) 将列转换为映射,然后选择一个顺序并创建一个 udf 以将其放入数组中。

接下来,我会将项目 df 标记为广播(150 行和约 150 列并不算太大)并进行连接。这可能是最长的部分。

然后我将获取两个数组并计算它们之间的余弦距离