我有一个功能集与相应的categoricalFeaturesInfo:Map [Int,Int].然而,对于我的生活,我无法弄清楚我应该如何使DecisionTree类工作.它不会接受任何内容,而是LabeledPoint作为数据.但是,LabeledPoint需要(double,vector),其中向量需要双精度数.
val LP = featureSet.map(x => LabeledPoint(classMap(x(0)),Vectors.dense(x.tail)))
// Run training algorithm to build the model
val maxDepth: Int = 3
val isMulticlassWithCategoricalFeatures: Boolean = true
val numClassesForClassification: Int = countPossibilities(labelCol)
val model = DecisionTree.train(LP, Classification, Gini, isMulticlassWithCategoricalFeatures, maxDepth, numClassesForClassification,categoricalFeaturesInfo)
Run Code Online (Sandbox Code Playgroud)
我得到的错误:
scala> val LP = featureSet.map(x => LabeledPoint(classMap(x(0)),Vectors.dense(x.tail)))
<console>:32: error: overloaded method value dense with alternatives:
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
(firstValue: Double,otherValues: Double*)org.apache.spark.mllib.linalg.Vector
cannot be applied to (Array[String])
val LP = featureSet.map(x => LabeledPoint(classMap(x(0)),Vectors.dense(x.tail)))
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用Mllib构建一个非常简单的scala独立应用程序,但在尝试构建程序时出现以下错误:
Object Mllib is not a member of package org.apache.spark
Run Code Online (Sandbox Code Playgroud)
然后,我意识到我必须添加Mllib作为依赖,如下所示:
version := "1"
scalaVersion :="2.10.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.0",
"org.apache.spark" %% "spark-mllib" % "1.1.0"
)
Run Code Online (Sandbox Code Playgroud)
但是,我在这里得到一个错误:
unresolved dependency spark-core_2.10.4;1.1.1 : not found
所以我不得不修改它
"org.apache.spark" % "spark-core_2.10" % "1.1.1",
但仍有一个错误说:
unresolved dependency spark-mllib;1.1.1 : not found
任何人都知道如何在.sbt文件中添加Mllib的依赖关系?
我正在努力实施KMeans using Apache Spark.
val data = sc.textFile(irisDatasetString)
val parsedData = data.map(_.split(',').map(_.toDouble)).cache()
val clusters = KMeans.train(parsedData,3,numIterations = 20)
Run Code Online (Sandbox Code Playgroud)
我得到以下错误:
error: overloaded method value train with alternatives:
(data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int,runs: Int)org.apache.spark.mllib.clustering.KMeansModel <and>
(data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int)org.apache.spark.mllib.clustering.KMeansModel <and>
(data: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector],k: Int,maxIterations: Int,runs: Int,initializationMode: String)org.apache.spark.mllib.clustering.KMeansModel
cannot be applied to (org.apache.spark.rdd.RDD[Array[Double]], Int, numIterations: Int)
val clusters = KMeans.train(parsedData,3,numIterations = 20)
Run Code Online (Sandbox Code Playgroud)
所以,我试图转换数组[双]至矢量如图这里
scala> val vectorData: Vector = Vectors.dense(parsedData)
Run Code Online (Sandbox Code Playgroud)
我得到以下错误:
error: type Vector takes type parameters
val vectorData: Vector = Vectors.dense(parsedData)
^ …Run Code Online (Sandbox Code Playgroud) 我使用Mllib在Spark上运行了一个python代码.它适用于小型数据集,但在大型数据集的两次迭代后,我收到以下错误:
ERROR TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler.TaskResultGetter$$anon$2@15b59543 rejected from java.util.concurrent.ThreadPoolExecutor@22427929[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2701]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2050)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.apache.spark.scheduler.TaskResultGetter.enqueueSuccessfulTask(TaskResultGetter.scala:49)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$liftedTree2$1$1.apply(TaskSchedulerImpl.scala:327)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$liftedTree2$1$1.apply(TaskSchedulerImpl.scala:324)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.TaskSchedulerImpl.liftedTree2$1(TaskSchedulerImpl.scala:324)
at org.apache.spark.scheduler.TaskSchedulerImpl.statusUpdate(TaskSchedulerImpl.scala:309)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:61)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:127)
at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:198)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:126)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:93)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) …Run Code Online (Sandbox Code Playgroud) 我在Apache Spark(使用pyspark)中训练了一个分类模型.我将模型存储在一个对象中LogisticRegressionModel.现在,我想对新数据做出预测.我想存储模型,并将其读回新程序以进行预测.知道如何存储模型吗?我想的可能是泡菜,但我是python和Spark的新手,所以我想听听社区的想法.
更新:我还需要一个决策树分类器.要阅读它,我需要导入DecisionTreeModelpyspark
上下文: 我有一个包含两列的数据框:标签和功能.
org.apache.spark.sql.DataFrame = [label: int, features: vector]
Run Code Online (Sandbox Code Playgroud)
其中features是使用VectorAssembler构建的数值类型的mllib.linalg.VectorUDT.
问题: 有没有办法为特征向量分配模式?我想跟踪每个功能的名称.
到目前为止尝试过:
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("feat1", "feat2", "feat3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
Run Code Online (Sandbox Code Playgroud)
scala> attrGroup.toMetadata
res197: org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"f1"},{"idx":1,"name":"f2"},{"idx":2,"name":"f3"}]},"num_attrs":3}}
Run Code Online (Sandbox Code Playgroud)
但不确定如何将其应用于现有数据框.
我尝试将PCA应用于我的数据,然后将RandomForest应用于转换后的数据.但是,PCA.transform(data)给了我一个DataFrame,但我需要一个mllib LabeledPoints来提供我的RandomForest.我怎样才能做到这一点?我的代码:
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.tree.model.RandomForestModel
import org.apache.spark.ml.feature.PCA
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
val dataset = MLUtils.loadLibSVMFile(sc, "data/mnist/mnist.bz2")
val splits = dataset.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
val trainingDf = trainingData.toDF()
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(100)
.fit(trainingDf)
val pcaTrainingData = pca.transform(trainingDf)
val numClasses = 10
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 10 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = …Run Code Online (Sandbox Code Playgroud) 我想使用pyspark.mllib.stat.Statistics.corr函数来计算两列pyspark.sql.dataframe.DataFrame对象之间的相关性.corr函数需要采取rdd的Vectors对象.如何将一列df['some_name']转换rdd为Vectors.dense对象?
python apache-spark apache-spark-sql pyspark apache-spark-mllib
Spark现在有两个机器学习库 - Spark MLlib和Spark ML.它们在实现的内容上有些重叠,但正如我所理解的那样(作为整个Spark生态系统的新手)Spark ML是可行的方式,MLlib仍然主要用于向后兼容.
我的问题非常具体,与PCA有关.在MLlib实现中,似乎存在列数的限制
spark.mllib支持PCA,用于存储以行为导向格式和任何向量的高小矩阵.
另外,如果你看一下Java代码示例,也会有这个
列数应该很小,例如小于1000.
另一方面,如果你看一下ML文档,没有提到的限制.
所以,我的问题是 - Spark ML中是否也存在这种限制?如果是这样,为什么限制,即使列数很大,是否有任何解决方法可以使用此实现?
使用SparkML预测标签时,结果Dataframe是:
scala> result.show
+-----------+--------------+
|probability|predictedLabel|
+-----------+--------------+
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.1,0.9]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.0,1.0]| 0.0|
| [0.1,0.9]| 0.0|
| [0.6,0.4]| 1.0|
| [0.6,0.4]| 1.0|
| [1.0,0.0]| 1.0|
| [0.9,0.1]| 1.0|
| [0.9,0.1]| 1.0|
| [1.0,0.0]| 1.0|
| [1.0,0.0]| 1.0|
+-----------+--------------+
only showing top 20 rows
Run Code Online (Sandbox Code Playgroud)
我想用一个名为prob的新列创建一个新的Dataframe,它是原始Dataframe的Vector in probability列中的第一个值,例如:
+-----------+--------------+----------+
|probability|predictedLabel| prob |
+-----------+--------------+----------+ …Run Code Online (Sandbox Code Playgroud) scala dataframe apache-spark apache-spark-sql apache-spark-mllib