PySpark 中的聚合稀疏向量

ADJ*_*ADJ 5 apache-spark apache-spark-sql pyspark apache-spark-ml

我有一个 Hive 表,其中包含与每个文档关联的文本数据和一些元数据。看起来像这样。

from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer

df = sc.parallelize([
  ("1", "doc_1", "fruit is good for you"),
  ("2", "doc_2", "you should eat fruit and veggies"),
  ("2", "doc_3", "kids eat fruit but not veggies")
]).toDF(["month","doc_id", "text"])
Run Code Online (Sandbox Code Playgroud)
+-----+------+--------------------+
|month|doc_id|                text|
+-----+------+--------------------+
|    1| doc_1|fruit is good for...|
|    2| doc_2|you should eat fr...|
|    2| doc_3|kids eat fruit bu...|
+-----+------+--------------------+
Run Code Online (Sandbox Code Playgroud)

我想按月数字。到目前为止,我已经采用了 CountVectorizer 方法:

+-----+------+--------------------+
|month|doc_id|                text|
+-----+------+--------------------+
|    1| doc_1|fruit is good for...|
|    2| doc_2|you should eat fr...|
|    2| doc_3|kids eat fruit bu...|
+-----+------+--------------------+
Run Code Online (Sandbox Code Playgroud)
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
tokenized = tokenizer.transform(df)

cvModel = CountVectorizer().setInputCol("words").setOutputCol("features").fit(tokenized)
counted = cvModel.transform(tokenized)
Run Code Online (Sandbox Code Playgroud)

现在我想按月分组并返回如下所示的内容:

+-----+------+--------------------+--------------------+--------------------+
|month|doc_id|                text|               words|            features|
+-----+------+--------------------+--------------------+--------------------+
|    1| doc_1|fruit is good for...|[fruit, is, good,...|(12,[0,3,4,7,8],[...|
|    2| doc_2|you should eat fr...|[you, should, eat...|(12,[0,1,2,3,9,11...|
|    2| doc_3|kids eat fruit bu...|[kids, eat, fruit...|(12,[0,1,2,5,6,10...|
+-----+------+--------------------+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

我怎么能那样做?

zer*_*323 2

*没有内置的Vector聚合机制,但您在这里不需要这样的机制。一旦您对数据进行了标记,您就可以explode进行聚合:

from pyspark.sql.functions import explode

(counted
    .select("month", explode("words").alias("word"))
    .groupBy("month", "word")
    .count())
Run Code Online (Sandbox Code Playgroud)

如果您希望将结果限制为vocabulary仅添加过滤器:

from pyspark.sql.functions import col

(counted
    .select("month", explode("words").alias("word"))
    .where(col("word").isin(cvModel.vocabulary))
    .groupBy("month", "word")
    .count())
Run Code Online (Sandbox Code Playgroud)

* 从 Spark 2.4 开始,我们可以访问它Summarizer,但在这里它没有用处。