如何从SparkSQL DataFrame中的MapType列获取键和值

llo*_*ydh 11 scala dataframe apache-spark apache-spark-sql apache-spark-dataset

我有一个镶木地板文件中的数据有2个字段:object_id: Stringalpha: Map<>.

它被读入sparkSQL中的数据框,模式如下所示:

scala> alphaDF.printSchema()
root
 |-- object_id: string (nullable = true)
 |-- ALPHA: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)

我正在使用Spark 2.0,我正在尝试创建一个新的数据框,其中列需要是地图的object_id加号键,ALPHAobject_id, key1, key2, key2, ...

我是第一次尝试看看我是否至少可以像这样访问地图:

scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   alphaDF.map(a => a(0)).collect()
Run Code Online (Sandbox Code Playgroud)

但不幸的是,我似乎无法弄清楚如何访问地图的键.

有人可以告诉我一种方法,将object_id加号映射键作为列名称并将值映射为新数据帧中的相应值吗?

use*_*411 20

Spark> = 2.3

您可以使用map_keys函数简化过程:

import org.apache.spark.sql.functions.map_keys
Run Code Online (Sandbox Code Playgroud)

还有map_values功能,但在这里不会直接有用.

Spark <2.3

一般方法可以用几个步骤表示.首先需要进口:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
Run Code Online (Sandbox Code Playgroud)

和示例数据:

val ds = Seq(
  (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
  (2, Map("foo" -> (3, "c"))),
  (3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")
Run Code Online (Sandbox Code Playgroud)

要提取密钥,我们可以使用UDF(Spark <2.3)

val map_keys = udf[Seq[String], Map[String, Row]](_.keys.toSeq)
Run Code Online (Sandbox Code Playgroud)

或内置函数

import org.apache.spark.sql.functions.map_keys

val keysDF = df.select(map_keys($"alpha"))
Run Code Online (Sandbox Code Playgroud)

找到不同的:

val distinctKeys = keysDF.as[Seq[String]].flatMap(identity).distinct
  .collect.sorted
Run Code Online (Sandbox Code Playgroud)

您还可以使用以下方法推广keys提取explode:

import org.apache.spark.sql.functions.explode

val distinctKeys = df
  // Flatten the column into key, value columns
 .select(explode($"alpha"))
 .select($"key")
 .as[String].distinct
 .collect.sorted
Run Code Online (Sandbox Code Playgroud)

而且select:

ds.select($"id" +: distinctKeys.map(x => $"alpha".getItem(x).alias(x)): _*)
Run Code Online (Sandbox Code Playgroud)