使用 Pyspark 将密集向量转换为数据帧

kku*_*mar 3 python dataframe pandas apache-spark

首先,我尝试了下面链接中的所有内容来修复我的错误,但没有一个起作用。

如何在pyspark中将密集向量的RDD转换为DataFrame?

我正在尝试将密集向量转换为数据框(最好是 Spark)以及列名并遇到问题。

我在 spark 数据框中的列是使用 Vector Assembler 创建的向量,我现在想将其转换回数据帧,因为我想在向量中的某些变量上创建图。

方法一:

from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml.linalg import Vectors

temp=output.select("all_features")
temp.rdd.map(
    lambda row: (DenseVector(row[0].toArray()))
).toDF()
Run Code Online (Sandbox Code Playgroud)

下面是错误

TypeError: not supported type: <type 'numpy.ndarray'>
Run Code Online (Sandbox Code Playgroud)

方法二:

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.linalg import *

as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())
result = output.withColumn("all_features", as_ml("all_features"))
result.head(5)
Run Code Online (Sandbox Code Playgroud)

错误:

AttributeError: 'numpy.ndarray' object has no attribute 'asML'
Run Code Online (Sandbox Code Playgroud)

我还尝试将数据帧转换为 Pandas 数据帧,之后我无法将值拆分为单独的列

方法三:

pandas_df=temp.toPandas()
pandas_df1=pd.DataFrame(pandas_df.all_features.values.tolist())
Run Code Online (Sandbox Code Playgroud)

上面的代码运行良好,但我的数据框中仍然只有一列,所有值以逗号分隔作为列表。

任何帮助是极大的赞赏!

编辑:

这是我的临时数据框的样子。它只有一列all_features。我正在尝试创建一个数据框,将所有这些值拆分为单独的列(all_features 是一个使用 200 列创建的向量)

+--------------------+
|        all_features|
+--------------------+
|[0.01193689934723...|
|[0.04774759738895...|
|[0.0,0.0,0.194417...|
|[0.02387379869447...|
|[1.89796699621085...|
+--------------------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

预期输出是一个数据帧,所有 200 列都在一个数据帧中分离出来

+----------------------------+
|        col1| col2| col3|...
+----------------------------+
|0.01193689934723|0.0|0.5049431301173817...
|0.04774759738895|0.0|0.1657316216149636...
|0.0|0.0|7.213126372469...
|0.02387379869447|0.0|0.1866693496827619|...
|1.89796699621085|0.0|0.3192169213385746|...
+----------------------------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

这是我的 Pandas DF 输出的样子

              0
0   [0.011936899347238104, 0.0, 0.5049431301173817...
1   [0.047747597388952415, 0.0, 0.1657316216149636...
2   [0.0, 0.0, 0.19441761495525278, 7.213126372469...
3   [0.023873798694476207, 0.0, 0.1866693496827619...
4   [1.8979669962108585, 0.0, 0.3192169213385746, ...
Run Code Online (Sandbox Code Playgroud)

may*_*wal 6

由于您需要单独列中的所有功能(正如我从您的编辑中获得的),因此您提供的答案的链接不是您的解决方案。

尝试这个,

#column_names
temp = temp.rdd.map(lambda x:[float(y) for y in x['all_features']]).toDF(column_names)
Run Code Online (Sandbox Code Playgroud)

编辑:

由于您temp最初是一个数据框,因此您也可以使用此方法而无需将其转换为rdd

import pyspark.sql.functions as F
from pyspark.sql.types import *

splits = [F.udf(lambda val: float(val[i].item()),FloatType()) for i in range(200)]
temp = temp.select(*[s(F.col('all_features')).alias(c) for c,s in zip(column_names,splits)])
temp.show()
Run Code Online (Sandbox Code Playgroud)