我正在尝试在Spark中编写一个progor来执行Latent Dirichlet分配(LDA).此Spark文档页面提供了一个很好的示例,用于在示例数据上执行LDA.以下是该计划
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) …Run Code Online (Sandbox Code Playgroud) 这个问题针对熟悉py4j的人 - 可以帮助解决酸洗错误.我正在尝试向pyspark添加一个方法PythonMLLibAPI,它接受一个namedtuple的RDD,做一些工作,并以RDD的形式返回一个结果.
此方法是在PYthonMLLibAPI.trainALSModel()方法之后建模的,其类似的现有相关部分是:
def trainALSModel(
ratingsJRDD: JavaRDD[Rating],
.. )
Run Code Online (Sandbox Code Playgroud)
用于建模新代码的现有 python Rating类是:
class Rating(namedtuple("Rating", ["user", "product", "rating"])):
def __reduce__(self):
return Rating, (int(self.user), int(self.product), float(self.rating))
Run Code Online (Sandbox Code Playgroud)
这是尝试所以这里是相关的类:
新的 python类pyspark.mllib.clustering.MatrixEntry:
from collections import namedtuple
class MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])):
def __reduce__(self):
return MatrixEntry, (long(self.x), long(self.y), float(self.weight))
Run Code Online (Sandbox Code Playgroud)
新方法foobarRDD在PythonMLLibAPI中:
def foobarRdd(
data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = {
val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)}
rdd
}
Run Code Online (Sandbox Code Playgroud)
现在让我们试一试:
from pyspark.mllib.clustering …Run Code Online (Sandbox Code Playgroud)