我正在尝试调整使用隐式数据的ALS矩阵分解模型的参数.为此,我正在尝试使用pyspark.ml.tuning.CrossValidator来运行参数网格并选择最佳模型.我相信我的问题在于评估者,但我无法弄明白.
我可以使用回归RMSE评估器为显式数据模型工作,如下所示:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import rand
conf = SparkConf() \
.setAppName("MovieLensALS") \
.set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
["user", "item", "rating"])
dfRatingsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, …
Run Code Online (Sandbox Code Playgroud) 上下文: 我有一个包含两列的数据框:标签和功能.
org.apache.spark.sql.DataFrame = [label: int, features: vector]
Run Code Online (Sandbox Code Playgroud)
其中features是使用VectorAssembler构建的数值类型的mllib.linalg.VectorUDT.
问题: 有没有办法为特征向量分配模式?我想跟踪每个功能的名称.
到目前为止尝试过:
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("feat1", "feat2", "feat3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
Run Code Online (Sandbox Code Playgroud)
scala> attrGroup.toMetadata
res197: org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"f1"},{"idx":1,"name":"f2"},{"idx":2,"name":"f3"}]},"num_attrs":3}}
Run Code Online (Sandbox Code Playgroud)
但不确定如何将其应用于现有数据框.
Spark现在有两个机器学习库 - Spark MLlib和Spark ML.它们在实现的内容上有些重叠,但正如我所理解的那样(作为整个Spark生态系统的新手)Spark ML是可行的方式,MLlib仍然主要用于向后兼容.
我的问题非常具体,与PCA有关.在MLlib实现中,似乎存在列数的限制
spark.mllib支持PCA,用于存储以行为导向格式和任何向量的高小矩阵.
另外,如果你看一下Java代码示例,也会有这个
列数应该很小,例如小于1000.
另一方面,如果你看一下ML文档,没有提到的限制.
所以,我的问题是 - Spark ML中是否也存在这种限制?如果是这样,为什么限制,即使列数很大,是否有任何解决方法可以使用此实现?
我正在经历一种非常奇怪的行为VectorAssembler
,我想知道是否有其他人看过这个.
我的场景很简单.我从一个CSV
文件解析数据,我有一些标准Int
和Double
字段,我还计算一些额外的列.我的解析函数返回:
val joined = countPerChannel ++ countPerSource //two arrays of Doubles joined
(label, orderNo, pageNo, Vectors.dense(joinedCounts))
Run Code Online (Sandbox Code Playgroud)
我的main函数使用解析函数,如下所示:
val parsedData = rawData.filter(row => row != header).map(parseLine)
val data = sqlContext.createDataFrame(parsedData).toDF("label", "orderNo", "pageNo","joinedCounts")
Run Code Online (Sandbox Code Playgroud)
然后我用VectorAssembler
这样的:
val assembler = new VectorAssembler()
.setInputCols(Array("orderNo", "pageNo", "joinedCounts"))
.setOutputCol("features")
val assemblerData = assembler.transform(data)
Run Code Online (Sandbox Code Playgroud)
因此,当我在进入数据之前打印一行数据时,VectorAssembler
它看起来像这样:
[3.2,17.0,15.0,[0.0,0.0,0.0,0.0,3.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,4.0,0.0,0.0,2.0]]
Run Code Online (Sandbox Code Playgroud)
在VectorAssembler的转换函数之后,我打印同一行数据并得到:
[3.2,(18,[0,1,6,9,14,17],[17.0,15.0,3.0,1.0,4.0,2.0])]
Run Code Online (Sandbox Code Playgroud)
到底是怎么回事?做了VectorAssembler
什么?我已经仔细检查了所有计算,甚至按照简单的Spark示例,看不出我的代码有什么问题.你能?
我在Spark ML中使用随机森林进行多类预测.
对于spark ML中的MulticlassClassificationEvaluator(),是否可以通过每个类标签获得精确度/召回率?
目前,我只看到所有班级的精确/召回相结合.
machine-learning bigdata apache-spark apache-spark-ml multiclass-classification
我想StandardScaler
用来规范化功能.
这是我的代码:
val Array(trainingData, testData) = dataset.randomSplit(Array(0.7,0.3))
val vectorAssembler = new VectorAssembler().setInputCols(inputCols).setOutputCol("features").transform(trainingData)
val stdscaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false).fit(vectorAssembler)
Run Code Online (Sandbox Code Playgroud)
但是当我试图使用时,它抛出了异常 StandardScaler
[Stage 151:==> (9 + 2) / 200]16/12/28 20:13:57 WARN scheduler.TaskSetManager: Lost task 31.0 in stage 151.0 (TID 8922, slave1.hadoop.ml): org.apache.spark.SparkException: Values to assemble cannot be null.
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:159)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:142)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:142)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:97)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) …
Run Code Online (Sandbox Code Playgroud) 我在PySpark ML中创建自定义Transformer的评论部分找到了相同的讨论,但没有明确的答案.还有一个未解决的JIRA对应于:https://issues.apache.org/jira/browse/SPARK-17025.
鉴于Pyspark ML管道没有提供用于保存用python编写的自定义转换器的选项,有什么其他选项可以完成它?如何在我的python类中实现返回兼容java对象的_to_java方法?
我已经能够创建一个允许我一次索引多个字符串列的管道,但是我对它们进行了编码,因为与索引不同,编码器不是估算器所以我根本不会根据OneHotEncoder示例调用文档.
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler,
OneHotEncoder}
import org.apache.spark.ml.Pipeline
val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")
val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()
//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
cname => new StringIndexer()
.setInputCol(cname)
.setOutputCol(s"${cname}_index")
)
// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)
//encoding columns
val indexColumns = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
cname => …
Run Code Online (Sandbox Code Playgroud) 我一直在使用org.apache.spark.ml.Pipeline进行机器学习任务.了解实际概率而不仅仅是预测标签尤为重要,而且我很难得到它.在这里,我正在使用随机林进行二进制分类任务.类标签为"是"和"否".我想输出标签"是"的概率.概率作为管道输出存储在DenseVector中,例如[0.69,0.31],但我不知道哪一个对应于"是"(0.69或0.31?).我想应该有一些从labelIndexer检索它?
这是我的训练模型的任务代码
val sc = new SparkContext(new SparkConf().setAppName(" ML").setMaster("local"))
val data = .... // load data from file
val df = sqlContext.createDataFrame(data).toDF("label", "features")
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(df)
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(2)
.fit(df)
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
// Train a RandomForest model.
val rf = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
.setNumTrees(10)
.setFeatureSubsetStrategy("auto")
.setImpurity("gini")
.setMaxDepth(4)
.setMaxBins(32)
// …
Run Code Online (Sandbox Code Playgroud) 假设我有40个连续(DoubleType
)变量,我已经使用了四分位数ft_quantile_discretizer
.识别所有变量的四分位数非常快,因为该函数支持一次执行多个变量.
接下来,我想要一个热门代码那些分段变量,但是目前没有一个热代码支持所有这些变量的功能.所以我通过循环遍历变量,一次一个地管道ft_string_indexer
,ft_one_hot_encoder
并sdf_separate_column
为每个分段变量.这可以完成工作.但是,随着循环的进行,它会大大减慢.我认为它的内存不足,但无法弄清楚如何编程,以便它以相同的速度在变量上执行.
如果q_vars
是连续变量的变量名称(例如40个)的字符数组,我该如何以更加火花的方式对其进行编码?
for (v in q_vars) {
data_sprk_q<-data_sprk_q %>%
ft_string_indexer(v,paste0(v,"b"),"keep",string_order_type = "alphabetAsc") %>%
ft_one_hot_encoder(paste0(v,"b"),paste0(v,"bc")) %>%
sdf_separate_column(paste0(v,"bc"),into=q_vars_cat_list[[v]])
}
Run Code Online (Sandbox Code Playgroud)
我也尝试将所有引用的变量作为单个大型管道执行,但这也没有解决问题,所以我认为它与循环本身没有任何关系.
test_text<-paste0("data_sprk_q<-data_sprk_q %>% ", paste0("ft_string_indexer('",q_vars,"',paste0('",q_vars,"','b'),'keep',string_order_type = 'alphabetAsc') %>% ft_one_hot_encoder(paste0('",q_vars,"','b'),paste0('",q_vars,"','bc')) %>% sdf_separate_column(paste0('",q_vars,"','bc'),into=",q_vars_cat_list,")",collapse=" %>% "))
eval(parse(text=test_text))
Run Code Online (Sandbox Code Playgroud)
任何帮助,将不胜感激.