Pin*_* Li 1 scala apache-spark apache-spark-sql
我需要合并两个数据框并通过键组合列。这两个数据格式具有相同的架构,例如:
root
|-- id: String (nullable = true)
|-- cMap: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)
我想按“ id”分组并将“ cMap”聚合在一起以进行重复数据删除。我尝试了代码:
val df = df_a.unionAll(df_b).groupBy("id").agg(collect_list("cMap") as "cMap").
rdd.map(x => {
var map = Map[String,String]()
x.getAs[Seq[Map[String,String]]]("cMap").foreach( y =>
y.foreach( tuple =>
{
val key = tuple._1
val value = tuple._2
if(!map.contains(key))//deduplicate
map += (key -> value)
}))
Row(x.getAs[String]("id"),map)
})
Run Code Online (Sandbox Code Playgroud)
但是似乎collect_list不能用于映射结构:
org.apache.spark.sql.AnalysisException: No handler for Hive udf class org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList because: Only primitive type arguments are accepted but map<string,string> was passed as parameter 1..;
Run Code Online (Sandbox Code Playgroud)
还有其他解决方案吗?
从 Spark 3.0 开始,您可以:
map_entries
collect_set
flatten
map_from_entries
请参阅以下代码片段,input
您的输入数据框在哪里:
import org.apache.spark.sql.functions.{col, collect_set, flatten, map_entries, map_from_entries}
input
.withColumn("cMap", map_entries(col("cMap")))
.groupBy("id")
.agg(map_from_entries(flatten(collect_set("cMap"))).as("cMap"))
Run Code Online (Sandbox Code Playgroud)
给定以下数据框输入:
+---+--------------------+
|id |cMap |
+---+--------------------+
|1 |[k1 -> v1] |
|1 |[k2 -> v2, k3 -> v3]|
|2 |[k4 -> v4] |
|2 |[] |
|3 |[k6 -> v6, k7 -> v7]|
+---+--------------------+
Run Code Online (Sandbox Code Playgroud)
上面的代码片段返回以下数据帧:
+---+------------------------------+
|id |cMap |
+---+------------------------------+
|1 |[k1 -> v1, k2 -> v2, k3 -> v3]|
|3 |[k6 -> v6, k7 -> v7] |
|2 |[k4 -> v4] |
+---+------------------------------+
Run Code Online (Sandbox Code Playgroud)
您必须explode
首先在map列上使用function将地图解构为key和value列,union
然后对结果数据集进行distinct
重复数据删除,然后再groupBy
使用一些自定义Scala编码来聚合地图。
别说话了,让我们做一些编码,然后...
给定数据集:
scala> a.show(false)
+---+-----------------------+
|id |cMap |
+---+-----------------------+
|one|Map(1 -> one, 2 -> two)|
+---+-----------------------+
scala> a.printSchema
root
|-- id: string (nullable = true)
|-- cMap: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
scala> b.show(false)
+---+-------------+
|id |cMap |
+---+-------------+
|one|Map(1 -> one)|
+---+-------------+
scala> b.printSchema
root
|-- id: string (nullable = true)
|-- cMap: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)
您应该首先在地图列上使用爆炸功能。
explode(e:Column):Column为给定数组或map列中的每个元素创建一个新行。
val a_keyValues = a.select('*, explode($"cMap"))
scala> a_keyValues.show(false)
+---+-----------------------+---+-----+
|id |cMap |key|value|
+---+-----------------------+---+-----+
|one|Map(1 -> one, 2 -> two)|1 |one |
|one|Map(1 -> one, 2 -> two)|2 |two |
+---+-----------------------+---+-----+
val b_keyValues = b.select('*, explode($"cMap"))
Run Code Online (Sandbox Code Playgroud)
通过以下操作,您将获得不同的键值对,它们正是您所要求的重复数据删除。
val distinctKeyValues = a_keyValues.
union(b_keyValues).
select("id", "key", "value").
distinct // <-- deduplicate
scala> distinctKeyValues.show(false)
+---+---+-----+
|id |key|value|
+---+---+-----+
|one|1 |one |
|one|2 |two |
+---+---+-----+
Run Code Online (Sandbox Code Playgroud)
时间到groupBy
并创建最终地图列。
val result = distinctKeyValues.
withColumn("map", map($"key", $"value")).
groupBy("id").
agg(collect_list("map")).
as[(String, Seq[Map[String, String]])]. // <-- leave Rows for typed pairs
map { case (id, list) => (id, list.reduce(_ ++ _)) }. // <-- collect all entries under one map
toDF("id", "cMap") // <-- give the columns their names
scala> result.show(truncate = false)
+---+-----------------------+
|id |cMap |
+---+-----------------------+
|one|Map(1 -> one, 2 -> two)|
+---+-----------------------+
Run Code Online (Sandbox Code Playgroud)
请注意,自Spark 2.0.0起,unionAll已被弃用,并且union
是正确的联合运算符:
(从2.0.0版开始)使用union()
归档时间: |
|
查看次数: |
5339 次 |
最近记录: |