我有兴趣使用Spark MLlib应用LDA主题建模.我已经检查了这里的代码和解释,但我找不到如何使用模型然后在一个新的看不见的文档中找到主题分布.
我正在使用PySpark和MLlib使用Spark 1.3.0,我需要保存并加载我的模型.我使用这样的代码(取自官方文档)
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
rank = 10
numIterations = 20
model = ALS.train(ratings, rank, numIterations)
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
predictions.collect() # shows me some predictions
model.save(sc, "model0")
# Trying to load saved model and work with it
model0 = MatrixFactorizationModel.load(sc, "model0")
predictions0 = model0.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
Run Code Online (Sandbox Code Playgroud)
在我尝试使用model0之后,我得到一个很长的回溯,结束于此:
Py4JError: An error occurred …Run Code Online (Sandbox Code Playgroud) 我有一个带有元组值的RDD(String,SparseVector),我想使用RDD创建一个DataFrame.获取(label:string,features:vector)DataFrame,它是大多数ml算法库所需的Schema.我知道可以这样做,因为 当给定DataFrame的features列时,HashingTF ml Library会输出一个向量.
temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
StructField("label", DoubleType(), False),
StructField("tokens", ArrayType(StringType()), False)
]))
#assumming there is an RDD (double,array(strings))
hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")
ndf = hashingTF.transform(temp_df)
ndf.printSchema()
#outputs
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#| |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,我能以某种方式将(String,SparseVector)的RDD转换为(String,vector)的DataFrame.我试着平常,sqlContext.createDataFrame但没有DataType符合我的需求.
df = sqlContext.createDataFrame(rdd,StructType([ …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql pyspark apache-spark-ml apache-spark-mllib
我正在尝试使用spark mllib lda来总结我的文档语料库.
我的问题设置如下.
我有16台服务器(每台有20个内核和128GB内存).
当我执行LDA OnlineLDAOptimizer,它给了内存不足的错误,提示我增加spark.driver.maxResultSize类似
的11个任务(1302 MB)序列化结果总大小比spark.driver.maxResultSize大
我增加到spark.driver.maxResultSize120GB(也spark.driver.memory增加到120GB)并重新运行LDA但不缺.它仍然说11个任务(120.1 GB)的序列化结果的总大小比spark.driver.maxResultSize大
我尝试了另外一个包含大约100,000个独特单词的数据集,并且它有效
那么,在使用Spark mllib LDA时,如何估计内存使用量?我在官方文档中找不到任何规范.
注意我使用稀疏向量来构造RDD[(Long, Vector)]传递给docuemnt ,LDA.run()但不知道spark lda是否可以在内部正确处理稀疏格式.
(编辑)我使用Scala版本的LDA.不是Python版本.
这可能是一个相关的问题,但没有给出明确的答案. Spark LDA困境 - 预测和OOM问题
(编辑)的
这是我的代码片段(要点). https://gist.github.com/lucidfrontier45/11420721c0078c5b7415
def startJob(args: RunArgs)(implicit sc: SparkContext): Unit = {
val src = sc.textFile(args.fname, minPartitions = args.n_partitions).map(_.split("\t"))
.flatMap {
// input file's format is (user_id, product_name, count)
case Array(u, p, r, t) => Some((u.toInt, p.toInt, r.toDouble))
case _ => …Run Code Online (Sandbox Code Playgroud) 我想Estimator在PySpark MLlib中构建一个简单的自定义.我在这里可以编写一个自定义的Transformer,但我不知道如何在一个Estimator.我也不明白是什么@keyword_only以及为什么我需要这么多的二传手和吸气剂.Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有.
示例模型的伪代码:
class NormalDeviation():
def __init__(self, threshold = 3):
def fit(x, y=None):
self.model = {'mean': x.mean(), 'std': x.std()]
def predict(x):
return ((x-self.model['mean']) > self.threshold * self.model['std'])
def decision_function(x): # does ml-lib support this?
Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark apache-spark-ml apache-spark-mllib
我很好奇,如果在最新的2.0.1版本中有类似于sklearn的 http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.StratifiedShuffleSplit.html for apache-spark.
到目前为止,我只能找到https://spark.apache.org/docs/latest/mllib-statistics.html#stratified-sampling,它似乎不适合将严重不平衡的数据集拆分为火车/测试样本.
这是我第一次使用PySpark(Spark 2),我正在尝试为Logit模型创建一个玩具数据帧.我成功运行了教程,并希望将自己的数据传递给它.
我试过这个:
%pyspark
import numpy as np
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.regression import LabeledPoint
df = np.concatenate([np.random.randint(0,2, size=(1000)), np.random.randn(1000), 3*np.random.randn(1000)+2, 6*np.random.randn(1000)-2]).reshape(1000,-1)
df = map(lambda x: LabeledPoint(x[0], Vectors.dense(x[1:])), df)
mydf = spark.createDataFrame(df,["label", "features"])
Run Code Online (Sandbox Code Playgroud)
但我无法摆脱:
TypeError: Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector
Run Code Online (Sandbox Code Playgroud)
我正在使用ML库作为向量,输入是一个双数组,所以请问有什么问题?根据文档应该没问题.
非常感谢.
numpy apache-spark apache-spark-sql pyspark apache-spark-mllib
我需要添加两个存储在两个文件中的矩阵.
内容latest1.txt和latest2.txt下一个str:
1 2 3 4 5 6 7 8 9
我正在阅读这些文件如下:
scala> val rows = sc.textFile(“latest1.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r1 = rows
r1: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at :14
scala> val rows = sc.textFile(“latest2.txt”).map { line => val values = line.split(‘ ‘).map(_.toDouble)
Vectors.sparse(values.length,values.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
}
scala> val r2 = rows
r2: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map …Run Code Online (Sandbox Code Playgroud) 假设我有一个DataFrame(我从HDFS上的csv读入),我想通过MLlib训练一些算法.如何将行转换为LabeledPoints或以其他方式在此数据集上使用MLlib?
我正在尝试使用Apache Spark MLlib构建一个Movie Recommender系统.我在java中编写了一个推荐代码,并且在使用spark-submit命令运行时工作正常.
我的运行命令看起来像这样
bin/spark-submit --jars /opt/poc/spark-1.3.1-bin-hadoop2.6/mllib/spark-mllib_2.10-1.0.0.jar --class "com.recommender.MovieLensALSExtended" --master local[4] /home/sarvesh/Desktop/spark-test/recommender.jar /home/sarvesh/Desktop/spark-test/ml-latest-small/ratings.csv /home/sarvesh/Desktop/spark-test/ml-latest-small/movies.csv
现在我想在真实场景中使用我的推荐器,作为一个Web应用程序,我可以在其中查询推荐器以给出一些结果.
我想构建一个Spring MVC Web应用程序,它可以与Apache Spark Context交互,并在被问到时给我结果.
我的问题是我如何构建一个与群集上运行的Apache Spark交互的应用程序.因此,当请求到达控制器时,它应该接受用户查询并获取与spark-submit控制台上的命令输出相同的结果.
据我所知,我发现我们可以使用Spark SQL,与JDBC集成.但我没有找到任何好的例子.
提前致谢.
java machine-learning spring-mvc apache-spark apache-spark-mllib