Ora*_*uez 13 apache-spark apache-spark-sql pyspark apache-spark-ml apache-spark-mllib
我有一个带有元组值的RDD(String,SparseVector),我想使用RDD创建一个DataFrame.获取(label:string,features:vector)DataFrame,它是大多数ml算法库所需的Schema.我知道可以这样做,因为 当给定DataFrame的features列时,HashingTF ml Library会输出一个向量.
temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
StructField("label", DoubleType(), False),
StructField("tokens", ArrayType(StringType()), False)
]))
#assumming there is an RDD (double,array(strings))
hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")
ndf = hashingTF.transform(temp_df)
ndf.printSchema()
#outputs
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#| |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,我能以某种方式将(String,SparseVector)的RDD转换为(String,vector)的DataFrame.我试着平常,sqlContext.createDataFrame但没有DataType符合我的需求.
df = sqlContext.createDataFrame(rdd,StructType([
StructField("label" , StringType(),True),
StructField("features" , ?Type(),True)
]))
Run Code Online (Sandbox Code Playgroud)
zer*_*323 19
你必须在VectorUDT这里使用:
# In Spark 1.x
# from pyspark.mllib.linalg import SparseVector, VectorUDT
from pyspark.ml.linalg import SparseVector, VectorUDT
temp_rdd = sc.parallelize([
(0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
(1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])
schema = StructType([
StructField("label", DoubleType(), True),
StructField("features", VectorUDT(), True)
])
temp_rdd.toDF(schema).printSchema()
## root
## |-- label: double (nullable = true)
## |-- features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)
只是为了完整性Scala等效:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, StructType}
// In Spark 1x.
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
val schema = new StructType()
.add("label", DoubleType)
// In Spark 1.x
//.add("features", new VectorUDT())
.add("features",VectorType)
val temp_rdd: RDD[Row] = sc.parallelize(Seq(
Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))),
Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5))))
))
spark.createDataFrame(temp_rdd, schema).printSchema
// root
// |-- label: double (nullable = true)
// |-- features: vector (nullable = true)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8073 次 |
| 最近记录: |