UDF将单词映射到Spark中的术语索引

Bak*_*war 5 user-defined-functions apache-spark apache-spark-sql pyspark apache-spark-ml

我正在尝试获取从LDA模型获得的术语ID的相应主题词。

这是主题的数据框架,是Spark中LDA的词分布

topics_desc=ldaModel.describeTopics(20)
topics_desc.show(1)
Run Code Online (Sandbox Code Playgroud)
+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[0, 39, 68, 43, 5...|[0.06362107696025...|
+-----+--------------------+--------------------+
only showing top 1 row
Run Code Online (Sandbox Code Playgroud)

现在,由于我们拥有termIndices而不是实际的单词,因此我想在此数据框中添加另一列,该列将是对应的termIndices的单词。

现在,因为我CountVectorizer在Spark中运行了,所以我使用该模型并获取单词数组列表,如下所示。

# Creating Term Frequency Vector for each word
cv=CountVectorizer(inputCol="words", outputCol="tf_features", minDF=2.0)
cvModel=cv.fit(swremoved_df)
Run Code Online (Sandbox Code Playgroud)

cvModel.vocabulary 给出单词列表。

所以现在这是我写来获取映射的udf:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

def term_to_words(termindices):
    """ To get the corresponding words from term indices

    """


    return np.array(cvModel.vocabulary)[termindices]

term_to_words_conv=udf(term_to_words)


topics=topics_desc.withColumn("topics_words",term_to_words_conv("termIndices"))
Run Code Online (Sandbox Code Playgroud)

之所以将列表转换为np数组,是因为在numpy数组中,我可以通过传递一系列不能在列表中执行的索引来建立索引。

但是我得到这个错误。我不确定为什么会这样,因为我在这里几乎不做任何事情。

Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

编辑:

所以我想到了使用mapper函数代替udf

Py4JError: An error occurred while calling o443.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
def term_to_words(x):
    """ Mapper function to get the corresponding words for the term index

    """

    row=x.asDict()
    word_list=np.array(cvModel.vocabulary)

    return (row['topic'],row['termIndices'],row['termWeights'],word_list[row[termindices]])


topics_rdd=topics_desc.rdd.map(term_to_words)
Run Code Online (Sandbox Code Playgroud)

use*_*411 5

这里有两个不同的问题:

  • CountVectorizer是Java对象的包装器。它不能序列化并与闭包一起传递。出于同样的原因,您不能在map闭包中使用它。
  • 您不能从UDF返回NumPy类型。

您可以例如:

from pyspark.sql.types import ArrayType, StringType

def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        return [vocabulary[int(x)] for x in xs]
    return udf(indices_to_terms, ArrayType(StringType()))
Run Code Online (Sandbox Code Playgroud)

用法:

topics_desc.withColumn(
    "topics_words", indices_to_terms(cvModel.vocabulary)("termIndices"))
Run Code Online (Sandbox Code Playgroud)

如果要使用NumPy数组,则tolist()在从UDF返回之前将具有use 方法。