希望将一些R代码转换为Sparklyr,例如lmtest :: coeftest()和sandwich :: sandwich().尝试开始使用Sparklyr扩展但对Spark API来说很新并且遇到问题:(
运行Spark 2.1.1和sparklyr 0.5.5-9002
感觉第一步是使用linalg库制作DenseMatrix对象:
library(sparklyr)
library(dplyr)
sc <- spark_connect("local")
rows <- as.integer(2)
cols <- as.integer(2)
array <- c(1,2,3,4)
mat <- invoke_new(sc, "org.apache.spark.mllib.linalg.DenseMatrix",
rows, cols, array)
Run Code Online (Sandbox Code Playgroud)
这会导致错误:
Error: java.lang.Exception: No matched constructor found for class org.apache.spark.mllib.linalg.DenseMatrix
Run Code Online (Sandbox Code Playgroud)
好的,所以我得到了一个java lang异常,我很确定构造函数中的args rows和colsargs很好,但不确定最后一个,它应该是java Array.所以我尝试了几种排列:
array <- invoke_new(sc, "java.util.Arrays", c(1,2,3,4))
Run Code Online (Sandbox Code Playgroud)
但最终得到类似的错误信息......
Error: java.lang.Exception: No matched constructor found for class java.util.Arrays
Run Code Online (Sandbox Code Playgroud)
我觉得我错过了一些非常基本的东西.谁知道怎么了?
我想使用Spark的mllib.recommendation库来构建原型推荐系统.但是,我所拥有的用户数据的格式是以下格式:
AB123XY45678
CD234WZ12345
EF345OOO1234
GH456XY98765
....
Run Code Online (Sandbox Code Playgroud)
如果我想使用该mllib.recommendation库,根据Rating类的API ,用户ID必须是整数(也必须是连续的?)
看起来真实用户ID和Spark使用的数字之间必须进行某种转换.但是我该怎么做呢?
我有MatrixFactorizationModel对象.如果我在通过ALS.train(...)构建模型后立即向单个用户推荐产品,则需要300毫秒(对于我的数据和硬件).但是,如果我将模型保存到磁盘并加载回来,那么推荐需要大约2000毫秒.Spark警告:
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow.
Run Code Online (Sandbox Code Playgroud)
如何在加载模型后创建/设置分区器并缓存用户和产品因素?以下方法没有帮助:
model.userFeatures().cache();
model.productFeatures().cache();
Run Code Online (Sandbox Code Playgroud)
此外,我试图重新分区这些rdds并从重新分区版本创建新模型,但这也没有帮助.
我在Apache Spark中有一个带有整数数组的DataFrame,源是一组图像.我最终想在它上面做PCA,但是我在从数组中创建一个矩阵时遇到了麻烦.如何从RDD创建矩阵?
> imagerdd = traindf.map(lambda row: map(float, row.image))
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)
Traceback (most recent call last):
File "<ipython-input-21-6fdaa8cde069>", line 2, in <module>
mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)
File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__
values = self._convert_to_array(values, np.float64)
File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array
return np.asarray(array_like, dtype=dtype)
File "/usr/local/python/conda/lib/python2.7/site- packages/numpy/core/numeric.py", line 462, in asarray
return array(a, dtype, copy=False, order=order)
TypeError: float() argument must be a string or a number
Run Code Online (Sandbox Code Playgroud)
我从我能想到的每一种可能的安排中得到了同样的错误:
imagerdd = traindf.map(lambda row: Vectors.dense(row.image))
imagerdd = …Run Code Online (Sandbox Code Playgroud) .map()我用python中的哪个函数labeledPoints从spark数据帧创建一组?如果标签/结果不是第一列,但我可以参考其列名"状态",那么表示法是什么?
我用这个.map()函数创建了Python数据帧:
def parsePoint(line):
listmp = list(line.split('\t'))
dataframe = pd.DataFrame(pd.get_dummies(listmp[1:]).sum()).transpose()
dataframe.insert(0, 'status', dataframe['accepted'])
if 'NULL' in dataframe.columns:
dataframe = dataframe.drop('NULL', axis=1)
if '' in dataframe.columns:
dataframe = dataframe.drop('', axis=1)
if 'rejected' in dataframe.columns:
dataframe = dataframe.drop('rejected', axis=1)
if 'accepted' in dataframe.columns:
dataframe = dataframe.drop('accepted', axis=1)
return dataframe
Run Code Online (Sandbox Code Playgroud)
在reduce函数重新组合了所有Pandas数据帧之后,我将其转换为Spark数据帧.
parsedData=sqlContext.createDataFrame(parsedData)
Run Code Online (Sandbox Code Playgroud)
但是现在我如何labledPoints用Python 创建呢?我想它可能是另一个.map()功能?
python pandas apache-spark apache-spark-ml apache-spark-mllib
我在理解Spark的交叉验证方面遇到了一些麻烦.我见过的任何一个例子都用它来进行参数调整,但我认为它只会进行常规的K折交叉验证吗?
我想要做的是执行k折交叉验证,其中k = 5.我想获得每个结果的准确性,然后获得平均准确度.在scikit中学习这是怎么做的,分数会给你每个折叠的结果,然后你可以使用scores.mean()
scores = cross_val_score(classifier, y, x, cv=5, scoring='accuracy')
Run Code Online (Sandbox Code Playgroud)
这就是我在Spark中的做法,paramGridBuilder是空的,因为我不想输入任何参数.
val paramGrid = new ParamGridBuilder().build()
val evaluator = new MulticlassClassificationEvaluator()
evaluator.setLabelCol("label")
evaluator.setPredictionCol("prediction")
evaluator.setMetricName("precision")
val crossval = new CrossValidator()
crossval.setEstimator(classifier)
crossval.setEvaluator(evaluator)
crossval.setEstimatorParamMaps(paramGrid)
crossval.setNumFolds(5)
val modelCV = crossval.fit(df4)
val chk = modelCV.avgMetrics
Run Code Online (Sandbox Code Playgroud)
这和scikit学习实现的做法是一样的吗?为什么这些示例在进行交叉验证时会使用培训/测试数据?
classification machine-learning cross-validation apache-spark-mllib
尝试构建ML时出现以下错误Pipeline:
pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType(DoubleType,true).'
Run Code Online (Sandbox Code Playgroud)
我的features列包含一个浮点值数组.听起来我需要将它们转换为某种类型的向量(它不是稀疏的,所以是DenseVector?).有没有办法直接在DataFrame上执行此操作,还是需要转换为RDD?
python apache-spark pyspark apache-spark-ml apache-spark-mllib
KMeans有几个参数用于训练,初始化模式默认为kmeans ||.问题是它快速(少于10分钟)前进到前13个阶段,然后完全挂起,不会产生错误!
再现问题的最小示例(如果我使用1000点或随机初始化,它将成功):
from pyspark.context import SparkContext
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
sc = SparkContext(appName='kmeansMinimalExample')
# same with 10000 points
data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
C = KMeans.train(data, 8192, maxIterations=10)
sc.stop()
Run Code Online (Sandbox Code Playgroud)
这项工作什么都不做(它没有成功,失败或进展......),如下所示."执行者"选项卡中没有活动/失败的任务.Stdout和Stderr Logs没有特别有趣的东西:
如果我使用k=81,而不是8192,它将成功:
请注意,这两个电话takeSample(),不应该是一个问题,因为有在随机初始化的情况下打了两次电话.
那么,发生了什么?Spark的Kmeans 无法扩展吗?有人知道吗?你可以重现吗?
如果这是一个内存问题,我会像以前一样得到警告和错误.
注意:placeybordeaux的注释基于在客户端模式下执行作业,其中驱动程序的配置无效,导致退出代码143等(请参阅编辑历史记录),而不是群集模式,其中根本没有报告错误,应用程序只是挂起.
从零到323:为什么Spark Mllib KMeans算法非常慢?是相关的,但我认为他目睹了一些进展,而我的确悬而未决,我确实发表评论......
我使用Spark Scala来计算Dataframe行之间的余弦相似度.
数据帧格式如下
root
|-- SKU: double (nullable = true)
|-- Features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)
以下数据框的示例
+-------+--------------------+
| SKU| Features|
+-------+--------------------+
| 9970.0|[4.7143,0.0,5.785...|
|19676.0|[5.5,0.0,6.4286,4...|
| 3296.0|[4.7143,1.4286,6....|
|13658.0|[6.2857,0.7143,4....|
| 1.0|[4.2308,0.7692,5....|
| 513.0|[3.0,0.0,4.9091,5...|
| 3753.0|[5.9231,0.0,4.846...|
|14967.0|[4.5833,0.8333,5....|
| 2803.0|[4.2308,0.0,4.846...|
|11879.0|[3.1429,0.0,4.5,4...|
+-------+--------------------+
Run Code Online (Sandbox Code Playgroud)
我试图转置矩阵并检查以下提到的链接.Apache Spark Python Cosine与DataFrames 的相似性,计算 - 余弦相似性 - 通过-text-into-vector-using-tf-idf但我相信有一个更好的解决方案
我尝试了下面的示例代码
val irm = new IndexedRowMatrix(inClusters.rdd.map {
case (v,i:Vector) => IndexedRow(v, i)
}).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities
Run Code Online (Sandbox Code Playgroud)
但我得到了以下错误
Error:(80, 12) constructor cannot be instantiated to expected type;
found : (T1, T2)
required: …Run Code Online (Sandbox Code Playgroud) 我是火花新手。我正在使用以下配置集在 Spark 独立版 (v3.0.0) 中编写机器学习算法:
SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.set("spark.driver.memory", "8g");
conf.set("spark.driver.maxResultSize", "8g");
conf.set("spark.memory.fraction", "0.6");
conf.set("spark.memory.storageFraction", "0.5");
conf.set("spark.sql.shuffle.partitions", "5");
conf.set("spark.memory.offHeap.enabled", "false");
conf.set("spark.reducer.maxSizeInFlight", "96m");
conf.set("spark.shuffle.file.buffer", "256k");
conf.set("spark.sql.debug.maxToStringFields", "100");
Run Code Online (Sandbox Code Playgroud)
这就是我创建 CrossValidator 的方式
ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(gbt.maxBins(), new int[]{50})
.addGrid(gbt.maxDepth(), new int[]{2, 5, 10})
.addGrid(gbt.maxIter(), new int[]{5, 20, 40})
.addGrid(gbt.minInfoGain(), new double[]{0.0d, .1d, .5d})
.build();
CrossValidator gbcv = new CrossValidator()
.setEstimator(gbt)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(gbevaluator)
.setNumFolds(5)
.setParallelism(8)
.setSeed(session.getArguments().getTrainingRandom());
Run Code Online (Sandbox Code Playgroud)
问题是,当(在 paramGrid 中) maxDepth 只是 {2, 5} 和 maxIter {5, 20} 时,一切都工作得很好,但是当它像上面的代码中那样时,它会不断记录: ,其中 …