Pra*_*ain 6 java scala apache-spark spark-streaming apache-spark-sql
我有一个特殊的用例,其中我为同一位客户有多行,每行对象看起来像:
root
-c1: BigInt
-c2: String
-c3: Double
-c4: Double
-c5: Map[String, Int]
Run Code Online (Sandbox Code Playgroud)
现在,我按列c1进行分组,并为同一客户收集所有行作为列表,例如:
c1, [Row1, Row3, Row4]
c2, [Row2, Row5]
Run Code Online (Sandbox Code Playgroud)
我尝试这样做,
dataset.withColumn("combined", array("c1","c2","c3","c4","c5")).groupBy("c1").agg(collect_list("combined"))但出现异常:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'array(`c1`, `c2`, `c3`, `c4`, `c5`)' due to data type mismatch: input to function array should all be the same type, but it's [bigint, string, double, double, map<string,map<string,double>>];;
Run Code Online (Sandbox Code Playgroud)
而不是array您可以使用struct函数来组合列,并使用groupBy和collect_list聚合函数作为
import org.apache.spark.sql.functions._
df.withColumn("combined", struct("c1","c2","c3","c4","c5"))
.groupBy("c1").agg(collect_list("combined").as("combined_list"))
.show(false)
Run Code Online (Sandbox Code Playgroud)
让你分组数据集有schema作为
root
|-- c1: integer (nullable = false)
|-- combined_list: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- c1: integer (nullable = false)
| | |-- c2: string (nullable = true)
| | |-- c3: string (nullable = true)
| | |-- c4: string (nullable = true)
| | |-- c5: map (nullable = true)
| | | |-- key: string
| | | |-- value: integer (valueContainsNull = false)
Run Code Online (Sandbox Code Playgroud)
我希望答案是有帮助的