PRI*_*A M 3 scala apache-spark-sql
最初我有一个矩阵
0.0 0.4 0.4 0.0
0.1 0.0 0.0 0.7
0.0 0.2 0.0 0.3
0.3 0.0 0.0 0.0
Run Code Online (Sandbox Code Playgroud)
该矩阵matrix被转换成normal_array由
`val normal_array = matrix.toArray`
Run Code Online (Sandbox Code Playgroud)
我有一个字符串数组
inputCols : Array[String] = Array(p1, p2, p3, p4)
Run Code Online (Sandbox Code Playgroud)
我需要将此矩阵转换为以下数据帧。(注意:矩阵中的行数和列数将与的长度相同inputCols)
index p1 p2 p3 p4
p1 0.0 0.4 0.4 0.0
p2 0.1 0.0 0.0 0.7
p3 0.0 0.2 0.0 0.3
p4 0.3 0.0 0.0 0.0
Run Code Online (Sandbox Code Playgroud)
在python中,这可以通过pandas库轻松实现。
arrayToDataframe = pandas.DataFrame(normal_array,columns = inputCols, index = inputCols)
Run Code Online (Sandbox Code Playgroud)
但是我该怎么做Scala呢?
您可以执行以下操作
//convert your data to Scala Seq/List/Array
val list = Seq((0.0,0.4,0.4,0.0),(0.1,0.0,0.0,0.7),(0.0,0.2,0.0,0.3),(0.3,0.0,0.0,0.0))
//Define your Array of desired columns
val inputCols : Array[String] = Array("p1", "p2", "p3", "p4")
//Create DataFrame from given data, It will create dataframe with its own column names like _c1,_c2 etc
val df = sparkSession.createDataFrame(list)
//Getting the list of column names from dataframe
val dfColumns=df.columns
//Creating query to rename columns
val query=inputCols.zipWithIndex.map(index=>dfColumns(index._2)+" as "+inputCols(index._2))
//Firing above query
val newDf=df.selectExpr(query:_*)
//Creating udf which get index(0,1,2,3) as input and returns corresponding column name from your given array of columns
val getIndexUDF=udf((row_no:Int)=>inputCols(row_no))
//Adding temporary column row_no which contains index of row and removing after adding index column
val dfWithRow=newDf.withColumn("row_no",monotonicallyIncreasingId).withColumn("index",getIndexUDF(col("row_no"))).drop("row_no")
dfWithRow.show
Run Code Online (Sandbox Code Playgroud)
样本输出:
+---+---+---+---+-----+
| p1| p2| p3| p4|index|
+---+---+---+---+-----+
|0.0|0.4|0.4|0.0| p1|
|0.1|0.0|0.0|0.7| p2|
|0.0|0.2|0.0|0.3| p3|
|0.3|0.0|0.0|0.0| p4|
+---+---+---+---+-----+
Run Code Online (Sandbox Code Playgroud)