nan*_*nue 21 pca apache-spark apache-spark-sql pyspark apache-spark-ml
我正在使用pyspark(使用库)减少Spark DataFrame带有PCA模型的维度,spark ml如下所示:
pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)
Run Code Online (Sandbox Code Playgroud)
在哪里data是一个Spark DataFrame实验室,其中features一个DenseVector是3维:
data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')
Run Code Online (Sandbox Code Playgroud)
拟合后,我转换数据:
transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))
Run Code Online (Sandbox Code Playgroud)
我的问题是:如何提取此PCA的特征向量?如何计算他们解释的方差?
des*_*aut 28
[ 更新:从Spark 2.2开始,PCA和SVD都可在PySpark中使用 - 请参阅JIRA票证SPARK-6227以及用于Spark ML 2.2的PCA和PCAModel ; 以下原始答案仍适用于较旧的Spark版本.]
嗯,这似乎令人难以置信,但实际上没有办法从PCA分解中提取这些信息(至少从Spark 1.5开始).但同样,有许多类似的"抱怨" - 例如,请参阅此处,因为无法从a中提取最佳参数CrossValidatorModel.
幸运的是,几个月前,我参加了AMPLab(Berkeley)和Databricks 的"可扩展机器学习" MOOC,即Spark的创建者,我们在那里实施了一个完整的PCA管道"手工"作为家庭作业的一部分.我已经修改了我的函数(请放心,我得到了充分的信任:-),以便将数据框作为输入(而不是RDD),与您的格式相同(即DenseVectors包含数字特征的行).
我们首先需要定义一个中间函数,estimatedCovariance如下所示:
import numpy as np
def estimateCovariance(df):
"""Compute the covariance matrix for a given dataframe.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input dataframe.
"""
m = df.select(df['features']).map(lambda x: x[0]).mean()
dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean
return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
Run Code Online (Sandbox Code Playgroud)
然后,我们可以编写一个main pca函数如下:
from numpy.linalg import eigh
def pca(df, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(df)
col = cov.shape[1]
eigVals, eigVecs = eigh(cov)
inds = np.argsort(eigVals)
eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
components = eigVecs[0:k]
eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvals
score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
# Return the `k` principal components, `k` scores, and all eigenvalues
return components.T, score, eigVals
Run Code Online (Sandbox Code Playgroud)
测试
让我们首先看一下现有方法的结果,使用Spark ML PCA 文档中的示例数据(将它们全部修改DenseVectors):
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
model.transform(df).collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
Run Code Online (Sandbox Code Playgroud)
然后,用我们的方法:
comp, score, eigVals = pca(df)
score.collect()
[array([ 1.64857282, 4.0132827 ]),
array([-4.64510433, 1.11679727]),
array([-6.42888054, 5.33795143])]
Run Code Online (Sandbox Code Playgroud)
让我强调一点,我们在我们定义的函数中没有使用任何collect()方法 - score是RDD应该的.
请注意,我们的第二列的符号与现有方法的符号完全相反; 但这不是问题:根据(免费下载)统计学习简介,由Hastie和Tibshirani共同撰写,p.382
每个主要组件加载向量都是唯一的,直到符号翻转.这意味着两个不同的软件包将产生相同的主成分加载向量,尽管这些加载向量的符号可能不同.符号可能不同,因为每个主成分加载向量指定了p维空间中的方向:由于方向不改变,因此翻转符号无效.[...]类似地,得分向量在符号翻转之前是唯一的,因为Z的方差与-Z的方差相同.
最后,既然我们已经有了特征值,那么编写一个函数来解释方差的百分比是很简单的:
def varianceExplained(df, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(df, k)
return sum(eigenvalues[0:k])/sum(eigenvalues)
varianceExplained(df,1)
# 0.79439325322305299
Run Code Online (Sandbox Code Playgroud)
作为测试,我们还检查在我们的示例数据中解释的方差是否为1.0,对于k = 5(因为原始数据是5维的):
varianceExplained(df,5)
# 1.0
Run Code Online (Sandbox Code Playgroud)
这应该有效地完成你的工作; 随时提出您可能需要的任何澄清.
[使用Spark 1.5.0和1.5.1开发和测试]
eli*_*sah 14
编辑:
PCA并SVD终于在这两个可用pyspark开始火花2.2.0根据本解决JIRA票SPARK-6227.
原始答案:
从理论的角度来看,@ desertnaut给出的答案实际上非常出色,但我想提出另一种方法来研究如何计算SVD并提取特征向量.
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
class SVD(JavaModelWrapper):
"""Wrapper around the SVD scala case class"""
@property
def U(self):
""" Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
u = self.call("U")
if u is not None:
return RowMatrix(u)
@property
def s(self):
"""Returns a DenseVector with singular values in descending order."""
return self.call("s")
@property
def V(self):
""" Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
return self.call("V")
Run Code Online (Sandbox Code Playgroud)
这定义了我们的SVD对象.我们现在可以使用Java Wrapper定义我们的computeSVD方法.
def computeSVD(row_matrix, k, computeU=False, rCond=1e-9):
"""
Computes the singular value decomposition of the RowMatrix.
The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
* s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
* U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
* v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
:param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
:param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
:param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
:returns: SVD object
"""
java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
return SVD(java_model)
Run Code Online (Sandbox Code Playgroud)
现在,让我们将其应用于一个例子:
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
features = model.transform(df) # this create a DataFrame with the regular features and pca_features
# We can now extract the pca_features to prepare our RowMatrix.
pca_features = features.select("pca_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)
# Once the RowMatrix is ready we can compute our Singular Value Decomposition
svd = computeSVD(mat,2,True)
svd.s
# DenseVector([9.491, 4.6253])
svd.U.rows.collect()
# [DenseVector([0.1129, -0.909]), DenseVector([0.463, 0.4055]), DenseVector([0.8792, -0.0968])]
svd.V
# DenseMatrix(2, 2, [-0.8025, -0.5967, -0.5967, 0.8025], 0)
Run Code Online (Sandbox Code Playgroud)
您问题的最简单答案是将单位矩阵输入到您的模型中。
identity_input = [(Vectors.dense([1.0, .0, 0.0, .0, 0.0]),),(Vectors.dense([.0, 1.0, .0, .0, .0]),), \
(Vectors.dense([.0, 0.0, 1.0, .0, .0]),),(Vectors.dense([.0, 0.0, .0, 1.0, .0]),),
(Vectors.dense([.0, 0.0, .0, .0, 1.0]),)]
df_identity = sqlContext.createDataFrame(identity_input,["features"])
identity_features = model.transform(df_identity)
Run Code Online (Sandbox Code Playgroud)
这应该为您提供主要组成部分。
我认为 Eliasah 的答案在 Spark 框架方面更好,因为 Desertnaut 正在通过使用 numpy 的函数而不是 Spark 的操作来解决问题。然而,埃利亚萨的答案缺少对数据的标准化。因此,我将以下几行添加到 Eliasah 的答案中:
from pyspark.ml.feature import StandardScaler
standardizer = StandardScaler(withMean=True, withStd=False,
inputCol='features',
outputCol='std_features')
model = standardizer.fit(df)
output = model.transform(df)
pca_features = output.select("std_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)
svd = computeSVD(mat,5,True)
Run Code Online (Sandbox Code Playgroud)
显然, svd.V 和 Identity_features.select("pca_features").collect() 应该具有相同的值。
我在这篇博文中总结了 PCA 及其在 Spark 和 sklearn 中的使用。
在spark 2.2+中,您现在可以轻松获得解释的方差为:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=<columns of your original dataframe>, outputCol="features")
df = assembler.transform(<your original dataframe>).select("features")
from pyspark.ml.feature import PCA
pca = PCA(k=10, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
sum(model.explainedVariance)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10416 次 |
| 最近记录: |