标签: apache-spark-mllib

Matrix数学与Sparklyr

希望将一些R代码转换为Sparklyr,例如lmtest :: coeftest()和sandwich :: sandwich().尝试开始使用Sparklyr扩展但对Spark API来说很新并且遇到问题:(

运行Spark 2.1.1和sparklyr 0.5.5-9002

感觉第一步是使用linalg库制作DenseMatrix对象:

library(sparklyr)
library(dplyr)
sc <- spark_connect("local")

rows <- as.integer(2)
cols <- as.integer(2)
array <- c(1,2,3,4)

mat <- invoke_new(sc, "org.apache.spark.mllib.linalg.DenseMatrix", 
                  rows, cols, array)
Run Code Online (Sandbox Code Playgroud)

这会导致错误:

Error: java.lang.Exception: No matched constructor found for class org.apache.spark.mllib.linalg.DenseMatrix
Run Code Online (Sandbox Code Playgroud)

好的,所以我得到了一个java lang异常,我很确定构造函数中的args rowscolsargs很好,但不确定最后一个,它应该是java Array.所以我尝试了几种排列:

array <- invoke_new(sc, "java.util.Arrays", c(1,2,3,4))
Run Code Online (Sandbox Code Playgroud)

但最终得到类似的错误信息......

Error: java.lang.Exception: No matched constructor found for class java.util.Arrays
Run Code Online (Sandbox Code Playgroud)

我觉得我错过了一些非常基本的东西.谁知道怎么了?

r apache-spark apache-spark-mllib sparklyr

12
推荐指数
1
解决办法
644
查看次数

如果用户id是字符串而不是连续的整数,如何使用mllib.recommendation?

我想使用Spark的mllib.recommendation库来构建原型推荐系统.但是,我所拥有的用户数据的格式是以下格式:

AB123XY45678
CD234WZ12345
EF345OOO1234
GH456XY98765
....
Run Code Online (Sandbox Code Playgroud)

如果我想使用该mllib.recommendation库,根据Rating类的API ,用户ID必须是整数(也必须是连续的?)

看起来真实用户ID和Spark使用的数字之间必须进行某种转换.但是我该怎么做呢?

recommendation-engine apache-spark apache-spark-mllib

11
推荐指数
2
解决办法
4757
查看次数

正确保存/加载MatrixFactorizationModel

我有MatrixFactorizationModel对象.如果我在通过ALS.train(...)构建模型后立即向单个用户推荐产品,则需要300毫秒(对于我的数据和硬件).但是,如果我将模型保存到磁盘并加载回来,那么推荐需要大约2000毫秒.Spark警告:

15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow.
15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow.
Run Code Online (Sandbox Code Playgroud)

如何在加载模型后创建/设置分区器并缓存用户和产品因素?以下方法没有帮助:

model.userFeatures().cache();
model.productFeatures().cache();
Run Code Online (Sandbox Code Playgroud)

此外,我试图重新分区这些rdds并从重新分区版本创建新模型,但这也没有帮助.

apache-spark apache-spark-mllib

11
推荐指数
1
解决办法
704
查看次数

Apache Spark:如何从DataFrame创建矩阵?

我在Apache Spark中有一个带有整数数组的DataFrame,源是一组图像.我最终想在它上面做PCA,但是我在从数组中创建一个矩阵时遇到了麻烦.如何从RDD创建矩阵?

> imagerdd = traindf.map(lambda row: map(float, row.image))
> mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)
Traceback (most recent call last):

  File "<ipython-input-21-6fdaa8cde069>", line 2, in <module>
mat = DenseMatrix(numRows=206456, numCols=10, values=imagerdd)

  File "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 815, in __init__
values = self._convert_to_array(values, np.float64)

  File     "/usr/local/spark/current/python/lib/pyspark.zip/pyspark/mllib/linalg.py", line 806, in _convert_to_array
    return np.asarray(array_like, dtype=dtype)

  File "/usr/local/python/conda/lib/python2.7/site-        packages/numpy/core/numeric.py", line 462, in asarray
    return array(a, dtype, copy=False, order=order)

TypeError: float() argument must be a string or a number
Run Code Online (Sandbox Code Playgroud)

我从我能想到的每一种可能的安排中得到了同样的错误:

imagerdd = traindf.map(lambda row: Vectors.dense(row.image))
imagerdd = …
Run Code Online (Sandbox Code Playgroud)

python matrix apache-spark pyspark apache-spark-mllib

11
推荐指数
1
解决办法
1万
查看次数

在Python中使用Spark DataFrame创建labeledPoints

.map()我用python中的哪个函数labeledPoints从spark数据帧创建一组?如果标签/结果不是第一列,但我可以参考其列名"状态",那么表示法是什么?

我用这个.map()函数创建了Python数据帧:

def parsePoint(line):
    listmp = list(line.split('\t'))
    dataframe = pd.DataFrame(pd.get_dummies(listmp[1:]).sum()).transpose()
    dataframe.insert(0, 'status', dataframe['accepted'])
    if 'NULL' in dataframe.columns:
        dataframe = dataframe.drop('NULL', axis=1)  
    if '' in dataframe.columns:
        dataframe = dataframe.drop('', axis=1)  
    if 'rejected' in dataframe.columns:
        dataframe = dataframe.drop('rejected', axis=1)  
    if 'accepted' in dataframe.columns:
        dataframe = dataframe.drop('accepted', axis=1)  
    return dataframe 
Run Code Online (Sandbox Code Playgroud)

在reduce函数重新组合了所有Pandas数据帧之后,我将其转换为Spark数据帧.

parsedData=sqlContext.createDataFrame(parsedData)
Run Code Online (Sandbox Code Playgroud)

但是现在我如何labledPoints用Python 创建呢?我想它可能是另一个.map()功能?

python pandas apache-spark apache-spark-ml apache-spark-mllib

11
推荐指数
1
解决办法
1万
查看次数

Spark K-fold交叉验证

我在理解Spark的交叉验证方面遇到了一些麻烦.我见过的任何一个例子都用它来进行参数调整,但我认为它只会进行常规的K折交叉验证吗?

我想要做的是执行k折交叉验证,其中k = 5.我想获得每个结果的准确性,然后获得平均准确度.在scikit中学习这是怎么做的,分数会给你每个折叠的结果,然后你可以使用scores.mean()

scores = cross_val_score(classifier, y, x, cv=5, scoring='accuracy')
Run Code Online (Sandbox Code Playgroud)

这就是我在Spark中的做法,paramGridBuilder是空的,因为我不想输入任何参数.

val paramGrid = new ParamGridBuilder().build()
val evaluator = new MulticlassClassificationEvaluator()
  evaluator.setLabelCol("label")
  evaluator.setPredictionCol("prediction")
evaluator.setMetricName("precision")


val crossval = new CrossValidator()
crossval.setEstimator(classifier)
crossval.setEvaluator(evaluator) 
crossval.setEstimatorParamMaps(paramGrid)
crossval.setNumFolds(5)


val modelCV = crossval.fit(df4)
val chk = modelCV.avgMetrics
Run Code Online (Sandbox Code Playgroud)

这和scikit学习实现的做法是一样的吗?为什么这些示例在进行交叉验证时会使用培训/测试数据?

如何交叉验证RandomForest模型?

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala

classification machine-learning cross-validation apache-spark-mllib

11
推荐指数
1
解决办法
4988
查看次数

如何在PySpark DataFrame中将ArrayType转换为DenseVector?

尝试构建ML时出现以下错误Pipeline:

pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType(DoubleType,true).'
Run Code Online (Sandbox Code Playgroud)

我的features列包含一个浮点值数组.听起来我需要将它们转换为某种类型的向量(它不是稀疏的,所以是DenseVector?).有没有办法直接在DataFrame上执行此操作,还是需要转换为RDD?

python apache-spark pyspark apache-spark-ml apache-spark-mllib

11
推荐指数
1
解决办法
5057
查看次数

Spark的KMeans无法处理bigdata吗?

KMeans有几个参数用于训练,初始化模式默认为kmeans ||.问题是它快速(少于10分钟)前进到前13个阶段,然后完全挂起,不会产生错误!

再现问题的最小示例(如果我使用1000点或随机初始化,它将成功):

from pyspark.context import SparkContext

from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs


if __name__ == "__main__":
    sc = SparkContext(appName='kmeansMinimalExample')

    # same with 10000 points
    data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
    C = KMeans.train(data, 8192,  maxIterations=10)    

    sc.stop()
Run Code Online (Sandbox Code Playgroud)

这项工作什么都不做(它没有成功,失败或进展......),如下所示."执行者"选项卡中没有活动/失败的任务.Stdout和Stderr Logs没有特别有趣的东西:

在此输入图像描述

如果我使用k=81,而不是8192,它将成功:

在此输入图像描述

请注意,这两个电话takeSample(),不应该是一个问题,因为有在随机初始化的情况下打了两次电话.

那么,发生了什么?Spark的Kmeans 无法扩展吗?有人知道吗?你可以重现吗?


如果这是一个内存问题,我会像以前一样得到警告和错误.

注意:placeybordeaux的注释基于在客户端模式下执行作业,其中驱动程序的配置无效,导致退出代码143等(请参阅编辑历史记录),而不是群集模式,其中根本没有报告错误,应用程序只是挂起.


从零到323:为什么Spark Mllib KMeans算法非常慢?是相关的,但我认为他目睹了一些进展,而我的确悬而未决,我确实发表评论......

在此输入图像描述

python bigdata k-means apache-spark apache-spark-mllib

11
推荐指数
1
解决办法
2645
查看次数

计算余弦相似度Spark数据帧

我使用Spark Scala来计算Dataframe行之间的余弦相似度.

数据帧格式如下

root
    |-- SKU: double (nullable = true)
    |-- Features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)

以下数据框的示例

    +-------+--------------------+
    |    SKU|            Features|
    +-------+--------------------+
    | 9970.0|[4.7143,0.0,5.785...|
    |19676.0|[5.5,0.0,6.4286,4...|
    | 3296.0|[4.7143,1.4286,6....|
    |13658.0|[6.2857,0.7143,4....|
    |    1.0|[4.2308,0.7692,5....|
    |  513.0|[3.0,0.0,4.9091,5...|
    | 3753.0|[5.9231,0.0,4.846...|
    |14967.0|[4.5833,0.8333,5....|
    | 2803.0|[4.2308,0.0,4.846...|
    |11879.0|[3.1429,0.0,4.5,4...|
    +-------+--------------------+
Run Code Online (Sandbox Code Playgroud)

我试图转置矩阵并检查以下提到的链接.Apache Spark Python Cosine与DataFrames 的相似性,计算 - 余弦相似性 - 通过-text-into-vector-using-tf-idf但我相信有一个更好的解决方案

我尝试了下面的示例代码

val irm = new IndexedRowMatrix(inClusters.rdd.map {
  case (v,i:Vector) => IndexedRow(v, i)


}).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities
Run Code Online (Sandbox Code Playgroud)

但我得到了以下错误

Error:(80, 12) constructor cannot be instantiated to expected type;
 found   : (T1, T2)
 required: …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-mllib

11
推荐指数
1
解决办法
3732
查看次数

Spark v3.0.0 - 警告 DAGScheduler:广播大小为 xx 的大型任务二进制文件

我是火花新手。我正在使用以下配置集在 Spark 独立版 (v3.0.0) 中编写机器学习算法:

SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.set("spark.driver.memory", "8g");
conf.set("spark.driver.maxResultSize", "8g");
conf.set("spark.memory.fraction", "0.6");
conf.set("spark.memory.storageFraction", "0.5");
conf.set("spark.sql.shuffle.partitions", "5");
conf.set("spark.memory.offHeap.enabled", "false");
conf.set("spark.reducer.maxSizeInFlight", "96m");
conf.set("spark.shuffle.file.buffer", "256k");
conf.set("spark.sql.debug.maxToStringFields", "100");
Run Code Online (Sandbox Code Playgroud)

这就是我创建 CrossValidator 的方式

ParamMap[] paramGrid = new ParamGridBuilder()
            .addGrid(gbt.maxBins(), new int[]{50})
            .addGrid(gbt.maxDepth(), new int[]{2, 5, 10})
            .addGrid(gbt.maxIter(), new int[]{5, 20, 40})
            .addGrid(gbt.minInfoGain(), new double[]{0.0d, .1d, .5d})
            .build();

    CrossValidator gbcv = new CrossValidator()
            .setEstimator(gbt)
            .setEstimatorParamMaps(paramGrid)
            .setEvaluator(gbevaluator)
            .setNumFolds(5)
            .setParallelism(8)
            .setSeed(session.getArguments().getTrainingRandom());
Run Code Online (Sandbox Code Playgroud)

问题是,当(在 paramGrid 中) maxDepth 只是 {2, 5} 和 maxIter {5, 20} 时,一切都工作得很好,但是当它像上面的代码中那样时,它会不断记录: ,其中 …

java apache-spark apache-spark-ml apache-spark-mllib

11
推荐指数
1
解决办法
2万
查看次数