小编Pra*_*ain的帖子

用apache spark按组收集行

我有一个特殊的用例,其中我为同一位客户有多行,每行对象看起来像:

root
 -c1: BigInt
 -c2: String
 -c3: Double
 -c4: Double
 -c5: Map[String, Int]
Run Code Online (Sandbox Code Playgroud)

现在,我按列c1进行分组,并为同一客户收集所有行作为列表,例如:

c1, [Row1, Row3, Row4]
c2, [Row2, Row5]
Run Code Online (Sandbox Code Playgroud)

我尝试这样做, dataset.withColumn("combined", array("c1","c2","c3","c4","c5")).groupBy("c1").agg(collect_list("combined"))但出现异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'array(`c1`, `c2`, `c3`, `c4`, `c5`)' due to data type mismatch: input to function array should all be the same type, but it's [bigint, string, double, double, map<string,map<string,double>>];;
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark spark-streaming apache-spark-sql

6
推荐指数
1
解决办法
5862
查看次数