我已经能够创建一个允许我一次索引多个字符串列的管道,但是我对它们进行了编码,因为与索引不同,编码器不是估算器所以我根本不会根据OneHotEncoder示例调用文档.
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler,
OneHotEncoder}
import org.apache.spark.ml.Pipeline
val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")
val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()
//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)
//encoding columns
val indexColumns = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
cname => …
Run Code Online (Sandbox Code Playgroud) 为了构建NaiveBayes多类分类器,我使用CrossValidator来选择管道中的最佳参数:
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(new MulticlassClassificationEvaluator)
.setNumFolds(10)
val cvModel = cv.fit(trainingSet)
Run Code Online (Sandbox Code Playgroud)
管道包含通常的变换器和估计器,顺序如下:Tokenizer,StopWordsRemover,HashingTF,IDF,最后是NaiveBayes.
是否可以访问为最佳模型计算的指标?
理想情况下,我想访问所有模型的指标,以了解更改参数如何改变分类的质量.但目前,最好的模型已经足够好了.
仅供参考,我使用的是Spark 1.6.0
我试图用自定义阶段创建和保存管道.我需要使用一个添加column
到我DataFrame
的UDF
.因此,我想知道是否有可能将一个UDF
或类似的动作转换成一个Transformer
?
我的自定义UDF
看起来像这样,我想学习如何使用UDF
自定义Transformer
.
def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}
val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
Run Code Online (Sandbox Code Playgroud) scala user-defined-functions apache-spark apache-spark-sql apache-spark-ml
我一直在使用org.apache.spark.ml.Pipeline进行机器学习任务.了解实际概率而不仅仅是预测标签尤为重要,而且我很难得到它.在这里,我正在使用随机林进行二进制分类任务.类标签为"是"和"否".我想输出标签"是"的概率.概率作为管道输出存储在DenseVector中,例如[0.69,0.31],但我不知道哪一个对应于"是"(0.69或0.31?).我想应该有一些从labelIndexer检索它?
这是我的训练模型的任务代码
val sc = new SparkContext(new SparkConf().setAppName(" ML").setMaster("local"))
val data = .... // load data from file
val df = sqlContext.createDataFrame(data).toDF("label", "features")
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(df)
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(2)
.fit(df)
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
// Train a RandomForest model.
val rf = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setNumTrees(10)
.setFeatureSubsetStrategy("auto")
.setImpurity("gini")
.setMaxDepth(4)
.setMaxBins(32)
// …
Run Code Online (Sandbox Code Playgroud) 考虑一下这里给出的代码,
https://spark.apache.org/docs/1.2.0/ml-guide.html
import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)
val model1 = lr.fit(training)
Run Code Online (Sandbox Code Playgroud)
假设我们使用sqlContext.read()将"training"作为数据框读取,我们是否应该做类似的事情
val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this
Run Code Online (Sandbox Code Playgroud)
或者,在传递dataFrame时,fit函数将自动处理并行计算/数据
问候,
我DenseVector
RDD
喜欢这个
>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]
Run Code Online (Sandbox Code Playgroud)
我想把它转换成一个Dataframe
.我试过这样的
>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()
Run Code Online (Sandbox Code Playgroud)
它给出了这样的错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, …
Run Code Online (Sandbox Code Playgroud) apache-spark pyspark apache-spark-ml apache-spark-mllib apache-spark-2.0
我正在预测批量训练模型的流程之间的评级.我正在使用此处概述的方法:ALS模型 - 如何生成full_u*v ^ t*v?
! rm -rf ml-1m.zip ml-1m
! wget --quiet http://files.grouplens.org/datasets/movielens/ml-1m.zip
! unzip ml-1m.zip
! mv ml-1m/ratings.dat .
from pyspark.mllib.recommendation import Rating
ratingsRDD = sc.textFile('ratings.dat') \
.map(lambda l: l.split("::")) \
.map(lambda p: Rating(
user = int(p[0]),
product = int(p[1]),
rating = float(p[2]),
)).cache()
from pyspark.mllib.recommendation import ALS
rank = 50
numIterations = 20
lambdaParam = 0.1
model = ALS.train(ratingsRDD, rank, numIterations, lambdaParam)
Run Code Online (Sandbox Code Playgroud)
然后提取产品功能......
import json
import numpy as np
pf = model.productFeatures()
pf_vals = pf.sortByKey().values().collect()
pf_keys …
Run Code Online (Sandbox Code Playgroud) 有没有办法以在线学习的方式训练LDA模型,即.加载以前的火车模型,并用新文件更新?
machine-learning lda apache-spark apache-spark-ml apache-spark-mllib
我有一个包含许多列的spark数据帧'mydataframe'.我试图只在两列上运行kmeans:lat和long(纬度和经度),使用它们作为简单值).我想基于这两个列提取7个集群,然后我想将集群asignment附加到我的原始数据帧.我试过了:
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd # needs to be an RDD
data_rdd.cache()
# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)
但一段时间后我收到一个错误:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段5191.0中的任务1失败4次,最近失败:阶段5191.0中丢失任务1.3(TID 260738,10.19.211.69,执行程序1):org.apache. spark.api.python.PythonException:Traceback(最近一次调用最后一次)
我试图分离并重新连接群集.结果相同.我究竟做错了什么?
非常感谢你!
machine-learning k-means pyspark apache-spark-ml apache-spark-mllib
我有格式化为以下示例的大数据记录:
// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc| 123| true|
// |abc| 345| true|
// |abc| 567| true|
// |def| 123| true|
// |def| 345| true|
// |def| 567| true|
// |def| 789| false|
// +---+------+------+
Run Code Online (Sandbox Code Playgroud)
cid
并且itemId
是字符串。
有965,964,223条记录。
我正在尝试cid
使用StringIndexer
以下方法将其转换为整数:
dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)
Run Code Online (Sandbox Code Playgroud)
但是这些代码行非常慢(大约需要30分钟)。问题在于它是如此之大,以至于我之后再也无能为力了。
我正在使用具有2个节点(61 GB内存)的R4 2XLarge集群的Amazon EMR集群。
我可以进一步改善性能吗?任何帮助都感激不尽。