标签: apache-spark-mllib

Spark CrossValidatorModel访问其他型号比bestModel?

我正在使用Spark 1.6.1:

目前我正在使用CrossValidator来训练我的ML管道,其中包含各种参数.在训练过程之后,我可以使用CrossValidatorModel的bestModel属性来获取在交叉验证期间表现最佳的模型.是否会自动丢弃交叉验证的其他模型,还是可以选择性能比bestModel差的模型?

我问,因为我使用F1分数指标进行交叉验证,但我也对所有模型的weighedRecall感兴趣,而不仅仅是在交叉验证期间表现最佳的模型.

val folds = 6
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new MulticlassClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(folds)

val avgF1Scores = cvModel.avgMetrics

val predictedDf = cvModel.bestModel.transform(testDf)

// Here I would like to predict as well with the other models of the cross validation
Run Code Online (Sandbox Code Playgroud)

cross-validation apache-spark apache-spark-mllib apache-spark-1.6

9
推荐指数
2
解决办法
3007
查看次数

Spark Ml评估方法

我有一个火花数据框如下:

predictions.show(5)
+------+----+------+-----------+
|  user|item|rating| prediction|
+------+----+------+-----------+
|379433|  31|     1| 0.08203495|
|  1834|  31|     1|  0.4854447|
|422635|  31|     1|0.017672742|
|   839|  31|     1| 0.39273006|
| 51444|  31|     1| 0.09795039|
+------+----+------+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

预测是预测的评级,评级是隐含评级(计数).

现在我想检查我的推荐算法的AUC.

我首先尝试了pyspark.ml.BinaryClassificationEvaluator,因为它直接在数据框上工作.

# getting the evaluationa metric 

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
print evaluator.evaluate(predictions)
Run Code Online (Sandbox Code Playgroud)

这给了我以下错误:

---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-65-c642ea9c2cf5> in <module>()
      4 
      5 evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
----> 6 print evaluator.evaluate(predictions)
      7 
      8 #print evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})

/Users/i854319/spark/python/pyspark/ml/evaluation.py in …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark apache-spark-ml apache-spark-mllib

9
推荐指数
1
解决办法
2321
查看次数

KMeans在PySpark中聚类

我有一个包含许多列的spark数据帧'mydataframe'.我试图只在两列上运行kmeans:lat和long(纬度和经度),使用它们作为简单值).我想基于这两个列提取7个集群,然后我想将集群asignment附加到我的原始数据帧.我试过了:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)

但一段时间后我收到一个错误:

org.apache.spark.SparkException:作业因阶段失败而中止:阶段5191.0中的任务1失败4次,最近失败:阶段5191.0中丢失任务1.3(TID 260738,10.19.211.69,执行程序1):org.apache. spark.api.python.PythonException:Traceback(最近一次调用最后一次)

我试图分离并重新连接群集.结果相同.我究竟做错了什么?

非常感谢你!

machine-learning k-means pyspark apache-spark-ml apache-spark-mllib

9
推荐指数
2
解决办法
2万
查看次数

Spark MLLib线性回归模型截距始终为0.0?

我刚刚开始使用ML和Apache Spark,所以我一直在尝试基于Spark示例的线性回归.除了示例中的示例之外,我似乎无法为任何数据生成适当的模型,并且无论输入数据如何,截距始终为0.0.

我已经准备了一个基于该功能的简单训练数据集:

y =(2*x1)+(3*x2)+4

即我期望截距为4,权重为(2,3).

如果我在原始数据上运行LinearRegressionWithSGD.train(...),模型是:

Model intercept: 0.0, weights: [NaN,NaN]
Run Code Online (Sandbox Code Playgroud)

并且预测都是NaN:

Features: [1.0,1.0], Predicted: NaN, Actual: 9.0
Features: [1.0,2.0], Predicted: NaN, Actual: 12.0
Run Code Online (Sandbox Code Playgroud)

等等

如果我首先缩放数据,我得到:

Model intercept: 0.0, weights: [17.407863391511754,2.463212481736855]

Features: [1.0,1.0], Predicted: 19.871075873248607, Actual: 9.0
Features: [1.0,2.0], Predicted: 22.334288354985464, Actual: 12.0
Features: [1.0,3.0], Predicted: 24.797500836722318, Actual: 15.0
Run Code Online (Sandbox Code Playgroud)

等等

要么我做错了,要么我不明白这个模型的输出应该是什么,那么有人可以建议我在哪里出错吗?

我的代码如下:

   // Load and parse the dummy data (y, x1, x2) for y = (2*x1) + (3*x2) + 4
   // i.e. intercept should be 4, weights (2, 3)? …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-mllib

8
推荐指数
2
解决办法
6981
查看次数

如何将类型Row转换为Vector以提供给KMeans

当我尝试将df2提供给kmeans时,我收到以下错误

clusters = KMeans.train(df2, 10, maxIterations=30,
                        runs=10, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)

我得到的错误:

Cannot convert type <class 'pyspark.sql.types.Row'> into Vector
Run Code Online (Sandbox Code Playgroud)

df2是一个如下创建的数据框:

df = sqlContext.read.json("data/ALS3.json")
df2 = df.select('latitude','longitude')

df2.show()


     latitude|       longitude|

   60.1643075|      24.9460844|
   60.4686748|      22.2774728|
Run Code Online (Sandbox Code Playgroud)

如何将这两列转换为Vector并将其提供给KMeans?

k-means apache-spark pyspark pyspark-sql apache-spark-mllib

8
推荐指数
2
解决办法
1万
查看次数

如何在 DataFrame 中跨组使用 QuantileDiscretizer?

我有一个包含以下列的数据框。

scala> show_times.printSchema
root
 |-- account: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- show_name: string (nullable = true)
 |-- total_time_watched: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这是有关客户观看特定节目的次数的数据。我应该根据观看的总时间对每个节目的客户进行分类。

该数据集共有 1.33 亿行,其中 192 个不同的show_names.

对于每个单独的节目,我应该将客户分为 3 类(1、2、3)。

我使用 Spark MLlib 的QuantileDiscretizer

目前,我循环播放每个节目并按QuantileDiscretizer顺序运行,如下面的代码所示。

在此输入图像描述

我最终想要的是以下示例输入以获得示例输出。

输入示例:

account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Run Code Online (Sandbox Code Playgroud)

示例输出:

account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Run Code Online (Sandbox Code Playgroud)

是否有一种更有效和分布式的方法来使用groupBy类似的操作来完成此操作,而不是循环遍历每个操作show_name …

scala apache-spark apache-spark-sql apache-spark-mllib

8
推荐指数
1
解决办法
1984
查看次数

将数据帧转换为libsvm格式

我有一个由SQL查询产生的数据帧

df1 = sqlContext.sql("select * from table_test")
Run Code Online (Sandbox Code Playgroud)

我需要将此数据帧转换为libsvm格式,以便可以将其作为输入提供

pyspark.ml.classification.LogisticRegression
Run Code Online (Sandbox Code Playgroud)

我试着做以下事情.但是,这导致了以下错误,因为我正在使用spark 1.5.2

df1.write.format("libsvm").save("data/foo")
Failed to load class for data source: libsvm
Run Code Online (Sandbox Code Playgroud)

我想改用MLUtils.loadLibSVMFile.我在防火墙后面,不能直接pip安装它.所以我下载了文件,scp-ed然后手动安装它.一切似乎工作正常,但我仍然得到以下错误

import org.apache.spark.mllib.util.MLUtils
No module named org.apache.spark.mllib.util.MLUtils
Run Code Online (Sandbox Code Playgroud)

问题1:我的上述方法是将数据帧转换为正确方向的libsvm格式.问题2:如果问题1为"是",如何让MLUtils正常工作.如果"否",将数据帧转换为libsvm格式的最佳方法是什么

apache-spark apache-spark-sql pyspark spark-dataframe apache-spark-mllib

8
推荐指数
1
解决办法
8213
查看次数

使用RandomForest的Spark ML管道在20MB数据集上花费的时间太长

我正在使用Spark ML运行一些ML实验,并且在一个20MB的小型数据集(扑克数据集)和一个带参数网格的随机森林中,需要1小时30分钟才能完成。与此类似,使用scikit-learn所需的时间要少得多。

在环境方面,我正在测试2个从属服务器,每个从属服务器15GB内存,24个内核。我认为应该花这么长时间,并且我想知道问题是否出在我的代码中,因为我对Spark非常陌生。

这里是:

df = pd.read_csv(http://archive.ics.uci.edu/ml/machine-learning-databases/poker/poker-hand-testing.data)
dataframe = sqlContext.createDataFrame(df)

train, test = dataframe.randomSplit([0.7, 0.3])

columnTypes = dataframe.dtypes

for ct in columnTypes:
    if ct[1] == 'string' and ct[0] != 'label':
        categoricalCols += [ct[0]]
    elif ct[0] != 'label':
        numericCols += [ct[0]]

stages = []

for categoricalCol in categoricalCols:

    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")

stages += [stringIndexer]

assemblerInputs = map(lambda c: c + "Index", categoricalCols) + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')

stages += …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark apache-spark-ml apache-spark-mllib

8
推荐指数
1
解决办法
2574
查看次数

在机器学习中我应该使用哪种算法来推荐,基于评级,类型,性别等不同的功能

我正在开发一个网站,它会根据访问者的数据向访问者推荐食谱.我正从他们的个人资料,网站活动和Facebook收集数据.

目前,我有一个像[用户名/用户标识符,食谱,年龄,性别的等级,类型(蔬菜/非蔬菜),菜(意大利/中国..等)的数据.关于上述功能,我想推荐他们没有访问过的新配方.

我已经实现了ALS(交替最小二乘)火花算法.在这里我们必须准备包含[userId,RecipesId,Rating]列的csv.然后我们必须训练这些数据,并通过调整lamdas,Rank,iteration等参数来创建模型.该模型使用pyspark生成推荐

model.recommendProducts(userId,numberOfRecommendations)

ALS算法仅接受三个功能userId,RecipesId,Rating.我无法包括多个功能(如类型,食品,性别等),从该我上面(用户ID,RecipesId,评分)提到开.我想要包含这些功能,然后训练模型并生成建议.

还有其他算法,我可以在其中包含上述参数并生成推荐.

任何帮助将不胜感激,谢谢.

machine-learning pyspark apache-spark-mllib data-science

8
推荐指数
0
解决办法
366
查看次数

Pyspark 中的向量汇编程序正在创建多个向量的元组而不是单个向量,如何解决问题?

我的python版本是3.6.3,spark版本是2.2.1。这是我的代码:

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession.builder.appName("Data Preprocessor") \
        .config("spark.some.config.option", "1") \
        .getOrCreate()

dataset = spark.createDataFrame([(0, 59.0, 0.0, Vectors.dense([2.0, 0.0, 
          0.0, 0.0, 0.0, 0.0, 0.0, 9.0, 9.0, 9.0]), 1.0)],
          ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(inputCols=["hour", "mobile", "userFeatures"], 
outputCol="features")

output = assembler.transform(dataset)
output.select("features").show(truncate=False)
Run Code Online (Sandbox Code Playgroud)

我没有得到单个向量,而是得到以下输出:

(12,[0,2,9,10,11],[59.0,2.0,9.0,9.0,9.0])

python apache-spark pyspark apache-spark-mllib

8
推荐指数
1
解决办法
1867
查看次数