Len*_* D. 5 scala apache-spark
我目前的问题是以下一个...
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'mapField' cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.;;
我想要实现的只是基本上按给定的一组列对 DataFrame 中的条目进行分组,但是在与前面提到的 MapType 列分组时似乎失败了。
.groupBy(
...
"mapField",
...
)
Run Code Online (Sandbox Code Playgroud)
我有一些想法,但必须有一种更简单的方法来解决这个问题,而不是我想过的以下方法......
我已经得到了保存在 DF 中的连接字符串中的每个元素的键值,所以我可以将它们解析为 Map 然后使用 保存它withColumn,但还没有找到任何方法,我无法得到我的工作。这样做合理吗?
重新解析为 RDD 并将其分组,然后返回 DF(我认为太麻烦了)
编辑
示例输入
id | myMap
'sample' | Map('a' -> 1, 'b' -> 2, 'c' -> 3)
Run Code Online (Sandbox Code Playgroud)
期望输出
id | a | b | c
'sample' | 1 | 2 | 3
Run Code Online (Sandbox Code Playgroud)
正如错误所示map<string,string> is not an orderable data type,您将需要使用可排序类型来表示地图。其中一种类型是数组,因此我们可以使用map_values和map_keys将地图数据提取到两个不同的字段中,如下所示:
import org.apache.spark.sql.functions.{map_values, map_keys}
val df = Seq(
(Map("k1"->"v1"), 12),
(Map("k2"->"v2"), 11),
(null, 10)
).toDF("map", "id")
df.select(map_values($"map").as("map_values")).show
// +----------+
// |map_values|
// +----------+
// | [v1]|
// | [v2]|
// | null|
// +---------------+
df.select(map_keys($"map").as("map_keys")).show
// +--------+
// |map_keys|
// +--------+
// | [k1]|
// | [k2]|
// | null|
// +--------+
Run Code Online (Sandbox Code Playgroud)
然后你可以直接使用它groupBy:
df.groupBy("map_keys").count()
Run Code Online (Sandbox Code Playgroud)
以及一个通用的模块化解决方案,以便多次使用它:
import org.apache.spark.sql.functions.{map_values, map_keys}
val df = Seq(
(Map("k1"->"v1"), 12),
(Map("k2"->"v2"), 11),
(null, 10)
).toDF("map", "id")
df.select(map_values($"map").as("map_values")).show
// +----------+
// |map_values|
// +----------+
// | [v1]|
// | [v2]|
// | null|
// +---------------+
df.select(map_keys($"map").as("map_keys")).show
// +--------+
// |map_keys|
// +--------+
// | [k1]|
// | [k2]|
// | null|
// +--------+
Run Code Online (Sandbox Code Playgroud)
用法:df.transform(unwrapMap("map_field"))
| 归档时间: |
|
| 查看次数: |
1032 次 |
| 最近记录: |