Apache Spark 中使用 K-means 进行 tf-idf 文档聚类,将点放入一个聚类中

jav*_*vid 5 python tf-idf k-means apache-spark

我正在尝试通过预处理、生成 tf-idf 矩阵,然后应用 K 均值来完成文本文档聚类的经典工作。但是,在经典 20NewsGroup 数据集上测试此工作流程会导致大多数文档聚集到一个集群中。(我最初尝试对 20 个组中的 6 个组中的所有文档进行聚类 - 因此希望聚类成 6 个簇)。

我正在 Apache Spark 中实现此功能,因为我的目的是在数百万个文档上利用此技术。以下是在 Databricks 上用 Pyspark 编写的代码:

#declare path to folder containing 6 of 20 news group categories
path = "/mnt/%s/20news-bydate.tar/20new-bydate-train-lessFolders/*/*" % 
MOUNT_NAME

#read all the text files from the 6 folders. Each entity is an entire 
document. 
text_files = sc.wholeTextFiles(path).cache()

#convert rdd to dataframe
df = text_files.toDF(["filePath", "document"]).cache()

from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer 

#tokenize the document text
tokenizer = Tokenizer(inputCol="document", outputCol="tokens")
tokenized = tokenizer.transform(df).cache()

from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="tokens", 
outputCol="stopWordsRemovedTokens")
stopWordsRemoved_df = remover.transform(tokenized).cache()

hashingTF = HashingTF (inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", numFeatures=200000)
tfVectors = hashingTF.transform(stopWordsRemoved_df).cache()    

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
idfModel = idf.fit(tfVectors)

tfIdfVectors = idfModel.transform(tfVectors).cache()

#note that I have also tried to use normalized data, but get the same result
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

normalizer = Normalizer(inputCol="features", outputCol="normFeatures")
l2NormData = normalizer.transform(tfIdfVectors)

from pyspark.ml.clustering import KMeans

# Trains a KMeans model.
kmeans = KMeans().setK(6).setMaxIter(20)
km_model = kmeans.fit(l2NormData)

clustersTable = km_model.transform(l2NormData)
Run Code Online (Sandbox Code Playgroud)

输出显示大多数文档都聚集到簇 0 中

ID number_of_documents_in_cluster
0    3024
3    5
1    3
5    2
2    2
4    1
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,我的大部分数据点都聚集到集群 0 中,而且我无法弄清楚我做错了什么,因为我在网上遇到的所有教程和代码都指向使用这种方法。

此外,我还尝试在 K 均值之前对 tf-idf 矩阵进行归一化,但这也会产生相同的结果。我知道余弦距离是一个更好的测量方法,但我预计在 Apache Spark 中使用标准 K 均值将提供有意义的结果。

任何人都可以帮忙确定我的代码中是否存在错误,或者我的数据集群管道中是否缺少某些内容?

先感谢您!

以下是 python 中的实现,即使具有大量最大特征,它也不会将所有文档分组在一起:

#imports
import pandas as pd
import os
import nltk
from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords

import numpy as np

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans, MiniBatchKMeans 

vectorizer = TfidfVectorizer(max_features=200000, lowercase=True,
                             min_df=5, stop_words='english',
                             use_idf=True)

X = vectorizer.fit_transform(df['document'])

#Apply K-means to create cluster
from time import time

km = KMeans(n_clusters=20, init='k-means++', max_iter=20, n_init=1,
            verbose=False)

km.fit(X)

#result
3     2634
6     1720
18    1307
15     780
0      745
1      689
16     504
8      438
7      421
5      369
11     347
14     330
4      243
13     165
10     136
17     118
9      113
19     106
12      87
2       62
Run Code Online (Sandbox Code Playgroud)

我本以为在 KMeans 中尝试余弦或 Jaccard 距离之前,我们可以使用 KMeans 和欧几里德距离在 pyspark 中复制类似的东西。有什么解决方案或意见吗?

小智 2

只是一些简短的评论:

  • 一般来说,K-Means 并不是文本分析的最佳算法,因为它在高维度上表现不佳。我建议使用 LDA。
  • 使用 K-Means,如果将特征数量减少到大约 2000 个,那么您更有可能获得多个不同的聚类。(我在 Databricks CE 中 /databricks-datasets/news20.binary/data-001/training 提供的 20news 数据集上快速进行了尝试,并且能够获得不同的集群。)
  • 无关:如果将所有转换器和 K-Means 放入管道中,然后仅调用 fit() 和 transform() 一次,MLlib 代码可能会更简洁。:)

这是我根据你的代码修改后可以运行的代码。警告:我根本没有调整它,所以集群目前几乎没有用(但它确实找到了不同的集群)。

df = spark.read.parquet("/databricks-datasets/news20.binary/data-001/training")
df.cache().count()

from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="stopWordsRemovedTokens")
hashingTF = HashingTF(inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", numFeatures=2000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=20)

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, kmeans])

model = pipeline.fit(df)

results = model.transform(df)
results.cache()

display(results.groupBy("prediction").count())  # Note "display" is for Databricks; use show() for OSS Apache Spark
Run Code Online (Sandbox Code Playgroud)