标签: apache-spark-ml

在多列上使用Spark ML的OneHotEncoder

我已经能够创建一个允许我一次索引多个字符串列的管道,但是我对它们进行了编码,因为与索引不同,编码器不是估算器所以我根本不会根据OneHotEncoder示例调用文档.

import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, 

OneHotEncoder}
import org.apache.spark.ml.Pipeline

val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")

val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()


//indexing columns
val stringColumns = Array("domain","size", "form_factor")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
  cname => new StringIndexer()
    .setInputCol(cname)
    .setOutputCol(s"${cname}_index")
)

// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(df)
val df_indexed = index_model.transform(df)


//encoding columns
val indexColumns  = df_indexed.columns.filter(x => x contains "index")
val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
    cname => …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-ml

9
推荐指数
1
解决办法
7422
查看次数

SPARK,ML,Tuning,CrossValidator:访问指标

为了构建NaiveBayes多类分类器,我使用CrossValidator来选择管道中的最佳参数:

val cv = new CrossValidator()
        .setEstimator(pipeline)
        .setEstimatorParamMaps(paramGrid)
        .setEvaluator(new MulticlassClassificationEvaluator)
        .setNumFolds(10)

val cvModel = cv.fit(trainingSet)
Run Code Online (Sandbox Code Playgroud)

管道包含通常的变换器和估计器,顺序如下:Tokenizer,StopWordsRemover,HashingTF,IDF,最后是NaiveBayes.

是否可以访问为最佳模型计算的指标?

理想情况下,我想访问所有模型的指标,以了解更改参数如何改变分类的质量.但目前,最好的模型已经足够好了.

仅供参考,我使用的是Spark 1.6.0

apache-spark apache-spark-ml apache-spark-mllib

9
推荐指数
1
解决办法
2804
查看次数

如何从UDF创建自定义Transformer?

我试图用自定义阶段创建和保存管道.我需要使用一个添加column到我DataFrameUDF.因此,我想知道是否有可能将一个UDF或类似的动作转换成一个Transformer

我的自定义UDF看起来像这样,我想学习如何使用UDF自定义Transformer.

def getFeatures(n: String) = {
    val NUMBER_FEATURES = 4  
    val name = n.split(" +")(0).toLowerCase
    ((1 to NUMBER_FEATURES)
         .filter(size => size <= name.length)
         .map(size => name.substring(name.length - size)))
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
Run Code Online (Sandbox Code Playgroud)

scala user-defined-functions apache-spark apache-spark-sql apache-spark-ml

9
推荐指数
1
解决办法
4217
查看次数

如何从Spark ML随机森林中获取与该类对应的概率

我一直在使用org.apache.spark.ml.Pipeline进行机器学习任务.了解实际概率而不仅仅是预测标签尤为重要,而且我很难得到它.在这里,我正在使用随机林进行二进制分类任务.类标签为"是"和"否".我想输出标签"是"的概率.概率作为管道输出存储在DenseVector中,例如[0.69,0.31],但我不知道哪一个对应于"是"(0.69或0.31?).我想应该有一些从labelIndexer检索它?

这是我的训练模型的任务代码

val sc = new SparkContext(new SparkConf().setAppName(" ML").setMaster("local"))
val data = .... // load data from file
val df = sqlContext.createDataFrame(data).toDF("label", "features")
val labelIndexer = new StringIndexer()
                      .setInputCol("label")
                      .setOutputCol("indexedLabel")
                      .fit(df)

val featureIndexer = new VectorIndexer()
                        .setInputCol("features")
                        .setOutputCol("indexedFeatures")
                        .setMaxCategories(2)
                        .fit(df)


// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))


// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)
  .setFeatureSubsetStrategy("auto")
  .setImpurity("gini")
  .setMaxDepth(4)
  .setMaxBins(32)

// …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-ml

9
推荐指数
1
解决办法
3227
查看次数

我们应该并行化DataFrame,就像我们在训练之前并行化Seq一样

考虑一下这里给出的代码,

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)
Run Code Online (Sandbox Code Playgroud)

假设我们使用sqlContext.read()将"training"作为数据框读取,我们是否应该做类似的事情

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this
Run Code Online (Sandbox Code Playgroud)

或者,在传递dataFrame时,fit函数将自动处理并行计算/数据

问候,

scala apache-spark apache-spark-sql pyspark apache-spark-ml

9
推荐指数
2
解决办法
2万
查看次数

如何在pyspark中将密集向量的RDD转换为DataFrame?

DenseVector RDD喜欢这个

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]
Run Code Online (Sandbox Code Playgroud)

我想把它转换成一个Dataframe.我试过这样的

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()
Run Code Online (Sandbox Code Playgroud)

它给出了这样的错误

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark apache-spark-ml apache-spark-mllib apache-spark-2.0

9
推荐指数
1
解决办法
5539
查看次数

ALS模型 - 预测full_u*v ^ t*v等级非常高

我正在预测批量训练模型的流程之间的评级.我正在使用此处概述的方法:ALS模型 - 如何生成full_u*v ^ t*v?

! rm -rf ml-1m.zip ml-1m
! wget --quiet http://files.grouplens.org/datasets/movielens/ml-1m.zip
! unzip ml-1m.zip
! mv ml-1m/ratings.dat .

from pyspark.mllib.recommendation import Rating

ratingsRDD = sc.textFile('ratings.dat') \
               .map(lambda l: l.split("::")) \
               .map(lambda p: Rating(
                                  user = int(p[0]), 
                                  product = int(p[1]),
                                  rating = float(p[2]), 
                                  )).cache()

from pyspark.mllib.recommendation import ALS

rank = 50
numIterations = 20
lambdaParam = 0.1
model = ALS.train(ratingsRDD, rank, numIterations, lambdaParam)
Run Code Online (Sandbox Code Playgroud)

然后提取产品功能......

import json
import numpy as np

pf = model.productFeatures()

pf_vals = pf.sortByKey().values().collect()
pf_keys …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-ml apache-spark-mllib

9
推荐指数
1
解决办法
1260
查看次数

在线学习Spark中的LDA模型

有没有办法以在线学习的方式训练LDA模型,即.加载以前的火车模型,并用新文件更新?

machine-learning lda apache-spark apache-spark-ml apache-spark-mllib

9
推荐指数
1
解决办法
1245
查看次数

KMeans在PySpark中聚类

我有一个包含许多列的spark数据帧'mydataframe'.我试图只在两列上运行kmeans:lat和long(纬度和经度),使用它们作为简单值).我想基于这两个列提取7个集群,然后我想将集群asignment附加到我的原始数据帧.我试过了:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)

但一段时间后我收到一个错误:

org.apache.spark.SparkException:作业因阶段失败而中止:阶段5191.0中的任务1失败4次,最近失败:阶段5191.0中丢失任务1.3(TID 260738,10.19.211.69,执行程序1):org.apache. spark.api.python.PythonException:Traceback(最近一次调用最后一次)

我试图分离并重新连接群集.结果相同.我究竟做错了什么?

非常感谢你!

machine-learning k-means pyspark apache-spark-ml apache-spark-mllib

9
推荐指数
2
解决办法
2万
查看次数

在大型记录上,Spark StringIndexer.fit非常慢

我有格式化为以下示例的大数据记录:

// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc|   123|  true|
// |abc|   345|  true|
// |abc|   567|  true|
// |def|   123|  true|
// |def|   345|  true|
// |def|   567|  true|
// |def|   789| false|
// +---+------+------+
Run Code Online (Sandbox Code Playgroud)

cid并且itemId是字符串。

有965,964,223条记录。

我正在尝试cid使用StringIndexer以下方法将其转换为整数:

dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)
Run Code Online (Sandbox Code Playgroud)

但是这些代码行非常慢(大约需要30分钟)。问题在于它是如此之大,以至于我之后再也无能为力了。

我正在使用具有2个节点(61 GB内存)的R4 2XLarge集群的Amazon EMR集群。

我可以进一步改善性能吗?任何帮助都感激不尽。

apache-spark apache-spark-ml apache-spark-dataset

9
推荐指数
1
解决办法
412
查看次数