我正在使用Spark 1.6.1:
目前我正在使用CrossValidator来训练我的ML管道,其中包含各种参数.在训练过程之后,我可以使用CrossValidatorModel的bestModel属性来获取在交叉验证期间表现最佳的模型.是否会自动丢弃交叉验证的其他模型,还是可以选择性能比bestModel差的模型?
我问,因为我使用F1分数指标进行交叉验证,但我也对所有模型的weighedRecall感兴趣,而不仅仅是在交叉验证期间表现最佳的模型.
val folds = 6
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new MulticlassClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(folds)
val avgF1Scores = cvModel.avgMetrics
val predictedDf = cvModel.bestModel.transform(testDf)
// Here I would like to predict as well with the other models of the cross validation
Run Code Online (Sandbox Code Playgroud) cross-validation apache-spark apache-spark-mllib apache-spark-1.6
我有一个火花数据框如下:
predictions.show(5)
+------+----+------+-----------+
| user|item|rating| prediction|
+------+----+------+-----------+
|379433| 31| 1| 0.08203495|
| 1834| 31| 1| 0.4854447|
|422635| 31| 1|0.017672742|
| 839| 31| 1| 0.39273006|
| 51444| 31| 1| 0.09795039|
+------+----+------+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
预测是预测的评级,评级是隐含评级(计数).
现在我想检查我的推荐算法的AUC.
我首先尝试了pyspark.ml.BinaryClassificationEvaluator,因为它直接在数据框上工作.
# getting the evaluationa metric
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
print evaluator.evaluate(predictions)
Run Code Online (Sandbox Code Playgroud)
这给了我以下错误:
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<ipython-input-65-c642ea9c2cf5> in <module>()
4
5 evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
----> 6 print evaluator.evaluate(predictions)
7
8 #print evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
/Users/i854319/spark/python/pyspark/ml/evaluation.py in …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark apache-spark-ml apache-spark-mllib
我有一个包含许多列的spark数据帧'mydataframe'.我试图只在两列上运行kmeans:lat和long(纬度和经度),使用它们作为简单值).我想基于这两个列提取7个集群,然后我想将集群asignment附加到我的原始数据帧.我试过了:
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd # needs to be an RDD
data_rdd.cache()
# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)
但一段时间后我收到一个错误:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段5191.0中的任务1失败4次,最近失败:阶段5191.0中丢失任务1.3(TID 260738,10.19.211.69,执行程序1):org.apache. spark.api.python.PythonException:Traceback(最近一次调用最后一次)
我试图分离并重新连接群集.结果相同.我究竟做错了什么?
非常感谢你!
machine-learning k-means pyspark apache-spark-ml apache-spark-mllib
我刚刚开始使用ML和Apache Spark,所以我一直在尝试基于Spark示例的线性回归.除了示例中的示例之外,我似乎无法为任何数据生成适当的模型,并且无论输入数据如何,截距始终为0.0.
我已经准备了一个基于该功能的简单训练数据集:
y =(2*x1)+(3*x2)+4
即我期望截距为4,权重为(2,3).
如果我在原始数据上运行LinearRegressionWithSGD.train(...),模型是:
Model intercept: 0.0, weights: [NaN,NaN]
Run Code Online (Sandbox Code Playgroud)
并且预测都是NaN:
Features: [1.0,1.0], Predicted: NaN, Actual: 9.0
Features: [1.0,2.0], Predicted: NaN, Actual: 12.0
Run Code Online (Sandbox Code Playgroud)
等等
如果我首先缩放数据,我得到:
Model intercept: 0.0, weights: [17.407863391511754,2.463212481736855]
Features: [1.0,1.0], Predicted: 19.871075873248607, Actual: 9.0
Features: [1.0,2.0], Predicted: 22.334288354985464, Actual: 12.0
Features: [1.0,3.0], Predicted: 24.797500836722318, Actual: 15.0
Run Code Online (Sandbox Code Playgroud)
等等
要么我做错了,要么我不明白这个模型的输出应该是什么,那么有人可以建议我在哪里出错吗?
我的代码如下:
// Load and parse the dummy data (y, x1, x2) for y = (2*x1) + (3*x2) + 4
// i.e. intercept should be 4, weights (2, 3)? …Run Code Online (Sandbox Code Playgroud) 当我尝试将df2提供给kmeans时,我收到以下错误
clusters = KMeans.train(df2, 10, maxIterations=30,
runs=10, initializationMode="random")
Run Code Online (Sandbox Code Playgroud)
我得到的错误:
Cannot convert type <class 'pyspark.sql.types.Row'> into Vector
Run Code Online (Sandbox Code Playgroud)
df2是一个如下创建的数据框:
df = sqlContext.read.json("data/ALS3.json")
df2 = df.select('latitude','longitude')
df2.show()
latitude| longitude|
60.1643075| 24.9460844|
60.4686748| 22.2774728|
Run Code Online (Sandbox Code Playgroud)
如何将这两列转换为Vector并将其提供给KMeans?
我有一个包含以下列的数据框。
scala> show_times.printSchema
root
|-- account: string (nullable = true)
|-- channel: string (nullable = true)
|-- show_name: string (nullable = true)
|-- total_time_watched: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这是有关客户观看特定节目的次数的数据。我应该根据观看的总时间对每个节目的客户进行分类。
该数据集共有 1.33 亿行,其中 192 个不同的show_names.
对于每个单独的节目,我应该将客户分为 3 类(1、2、3)。
我使用 Spark MLlib 的QuantileDiscretizer
目前,我循环播放每个节目并按QuantileDiscretizer顺序运行,如下面的代码所示。
我最终想要的是以下示例输入以获得示例输出。
输入示例:
account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Run Code Online (Sandbox Code Playgroud)
示例输出:
account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Run Code Online (Sandbox Code Playgroud)
是否有一种更有效和分布式的方法来使用groupBy类似的操作来完成此操作,而不是循环遍历每个操作show_name …
我有一个由SQL查询产生的数据帧
df1 = sqlContext.sql("select * from table_test")
Run Code Online (Sandbox Code Playgroud)
我需要将此数据帧转换为libsvm格式,以便可以将其作为输入提供
pyspark.ml.classification.LogisticRegression
Run Code Online (Sandbox Code Playgroud)
我试着做以下事情.但是,这导致了以下错误,因为我正在使用spark 1.5.2
df1.write.format("libsvm").save("data/foo")
Failed to load class for data source: libsvm
Run Code Online (Sandbox Code Playgroud)
我想改用MLUtils.loadLibSVMFile.我在防火墙后面,不能直接pip安装它.所以我下载了文件,scp-ed然后手动安装它.一切似乎工作正常,但我仍然得到以下错误
import org.apache.spark.mllib.util.MLUtils
No module named org.apache.spark.mllib.util.MLUtils
Run Code Online (Sandbox Code Playgroud)
问题1:我的上述方法是将数据帧转换为正确方向的libsvm格式.问题2:如果问题1为"是",如何让MLUtils正常工作.如果"否",将数据帧转换为libsvm格式的最佳方法是什么
apache-spark apache-spark-sql pyspark spark-dataframe apache-spark-mllib
我正在使用Spark ML运行一些ML实验,并且在一个20MB的小型数据集(扑克数据集)和一个带参数网格的随机森林中,需要1小时30分钟才能完成。与此类似,使用scikit-learn所需的时间要少得多。
在环境方面,我正在测试2个从属服务器,每个从属服务器15GB内存,24个内核。我认为应该花这么长时间,并且我想知道问题是否出在我的代码中,因为我对Spark非常陌生。
这里是:
df = pd.read_csv(http://archive.ics.uci.edu/ml/machine-learning-databases/poker/poker-hand-testing.data)
dataframe = sqlContext.createDataFrame(df)
train, test = dataframe.randomSplit([0.7, 0.3])
columnTypes = dataframe.dtypes
for ct in columnTypes:
if ct[1] == 'string' and ct[0] != 'label':
categoricalCols += [ct[0]]
elif ct[0] != 'label':
numericCols += [ct[0]]
stages = []
for categoricalCol in categoricalCols:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
stages += [stringIndexer]
assemblerInputs = map(lambda c: c + "Index", categoricalCols) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')
stages += …Run Code Online (Sandbox Code Playgroud) 我正在开发一个网站,它会根据访问者的数据向访问者推荐食谱.我正从他们的个人资料,网站活动和Facebook收集数据.
目前,我有一个像[用户名/用户标识符,食谱,年龄,性别的等级,类型(蔬菜/非蔬菜),菜(意大利/中国..等)的数据.关于上述功能,我想推荐他们没有访问过的新配方.
我已经实现了ALS(交替最小二乘)火花算法.在这里我们必须准备包含[userId,RecipesId,Rating]列的csv.然后我们必须训练这些数据,并通过调整lamdas,Rank,iteration等参数来创建模型.该模型使用pyspark生成推荐
model.recommendProducts(userId,numberOfRecommendations)
ALS算法仅接受三个功能userId,RecipesId,Rating.我无法包括多个功能(如类型,食品,性别等),从该我上面(用户ID,RecipesId,评分)提到开.我想要包含这些功能,然后训练模型并生成建议.
还有其他算法,我可以在其中包含上述参数并生成推荐.
任何帮助将不胜感激,谢谢.
我的python版本是3.6.3,spark版本是2.2.1。这是我的代码:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession.builder.appName("Data Preprocessor") \
.config("spark.some.config.option", "1") \
.getOrCreate()
dataset = spark.createDataFrame([(0, 59.0, 0.0, Vectors.dense([2.0, 0.0,
0.0, 0.0, 0.0, 0.0, 0.0, 9.0, 9.0, 9.0]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
output.select("features").show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
我没有得到单个向量,而是得到以下输出:
(12,[0,2,9,10,11],[59.0,2.0,9.0,9.0,9.0])