Har*_*pta 9 apache-spark pyspark apache-spark-ml apache-spark-mllib apache-spark-2.0
我DenseVector RDD喜欢这个
>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]
Run Code Online (Sandbox Code Playgroud)
我想把它转换成一个Dataframe.我试过这样的
>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()
Run Code Online (Sandbox Code Playgroud)
它给出了这样的错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
schema = _infer_schema(first)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>
Run Code Online (Sandbox Code Playgroud)
老解决方案
frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))
Run Code Online (Sandbox Code Playgroud)
编辑1 - 代码可重现
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector
sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')
sentenceData = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()
vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")
idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
Run Code Online (Sandbox Code Playgroud)
use*_*411 11
你无法RDD[Vector]直接转换.它应该映射到一个RDD对象,可以解释为structs,例如RDD[Tuple[Vector]]:
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
Run Code Online (Sandbox Code Playgroud)
否则Spark将尝试转换对象__dict__并创建使用不受支持的NumPy数组作为字段.
from pyspark.ml.linalg import DenseVector
from pyspark.sql.types import _infer_schema
v = DenseVector([1, 2, 3])
_infer_schema(v)
Run Code Online (Sandbox Code Playgroud)
TypeError Traceback (most recent call last)
...
TypeError: not supported type: <class 'numpy.ndarray'>
Run Code Online (Sandbox Code Playgroud)
与
_infer_schema((v, ))
Run Code Online (Sandbox Code Playgroud)
StructType(List(StructField(_1,VectorUDT,true)))
Run Code Online (Sandbox Code Playgroud)
备注:
在Spark 2.0中,您必须使用正确的本地类型:
pyspark.ml.linalg在DataFrame基于pyspark.mlAPI 工作时.pyspark.mllib.linalg在RDD基于pyspark.mllibAPI 工作时.这两个名称空间不再兼容并需要显式转换(例如,如何从org.apache.spark.mllib.linalg.VectorUDT转换为ml.linalg.VectorUDT).
编辑中提供的代码与原始问题中的代码不同.您应该知道tuple并且list没有相同的语义.如果您将矢量映射到配对使用tuple并直接转换为DataFrame:
tfidf.rdd.map(
lambda row: (row[0], DenseVector(row[1].toArray()))
).toDF()
Run Code Online (Sandbox Code Playgroud)
使用tuple(产品类型)也适用于嵌套结构,但我怀疑这是你想要的:
(tfidf.rdd
.map(lambda row: (row[0], DenseVector(row[1].toArray())))
.map(lambda x: (x, ))
.toDF())
Run Code Online (Sandbox Code Playgroud)
list在顶级以外的任何其他地方row被解释为ArrayType.
使用UDF进行转换更加清晰(Spark Python:标准缩放器错误"不支持... SparseVector").
| 归档时间: |
|
| 查看次数: |
5539 次 |
| 最近记录: |