无法在 Spark DataFrame 中按 MapType 列分组

Len*_* D. 5 scala apache-spark

我目前的问题是以下一个...

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'mapField' cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.;;

我想要实现的只是基本上按给定的一组列对 DataFrame 中的条目进行分组,但是在与前面提到的 MapType 列分组时似乎失败了。

  .groupBy(
    ...
    "mapField",
    ...
  )
Run Code Online (Sandbox Code Playgroud)

我有一些想法,但必须有一种更简单的方法来解决这个问题,而不是我想过的以下方法......

  • 我已经得到了保存在 DF 中的连接字符串中的每个元素的键值,所以我可以将它们解析为 Map 然后使用 保存它withColumn,但还没有找到任何方法,我无法得到我的工作。这样做合理吗?

  • 重新解析为 RDD 并将其分组,然后返回 DF(我认为太麻烦了)

编辑

示例输入

   id    |  myMap
'sample' |  Map('a' -> 1, 'b' -> 2, 'c' -> 3)
Run Code Online (Sandbox Code Playgroud)

期望输出

   id    |  a  |  b  |  c
'sample' |  1  |  2  |  3
Run Code Online (Sandbox Code Playgroud)

abi*_*sis 1

正如错误所示map<string,string> is not an orderable data type,您将需要使用可排序类型来表示地图。其中一种类型是数组,因此我们可以使用map_valuesmap_keys将地图数据提取到两个不同的字段中,如下所示:

import org.apache.spark.sql.functions.{map_values, map_keys}
val df = Seq(
    (Map("k1"->"v1"), 12),
    (Map("k2"->"v2"), 11),
    (null, 10) 
).toDF("map", "id")

df.select(map_values($"map").as("map_values")).show

// +----------+
// |map_values|
// +----------+
// |      [v1]|
// |      [v2]|
// |      null|
// +---------------+

df.select(map_keys($"map").as("map_keys")).show

// +--------+
// |map_keys|
// +--------+
// |    [k1]|
// |    [k2]|
// |    null|
// +--------+
Run Code Online (Sandbox Code Playgroud)

然后你可以直接使用它groupBy

df.groupBy("map_keys").count()
Run Code Online (Sandbox Code Playgroud)

以及一个通用的模块化解决方案,以便多次使用它:

import org.apache.spark.sql.functions.{map_values, map_keys}
val df = Seq(
    (Map("k1"->"v1"), 12),
    (Map("k2"->"v2"), 11),
    (null, 10) 
).toDF("map", "id")

df.select(map_values($"map").as("map_values")).show

// +----------+
// |map_values|
// +----------+
// |      [v1]|
// |      [v2]|
// |      null|
// +---------------+

df.select(map_keys($"map").as("map_keys")).show

// +--------+
// |map_keys|
// +--------+
// |    [k1]|
// |    [k2]|
// |    null|
// +--------+
Run Code Online (Sandbox Code Playgroud)

用法:df.transform(unwrapMap("map_field"))