如何在spark sql中合并map列?

Nat*_*ats 2 apache-spark apache-spark-sql

我在数据框中有两个地图类型列。有没有办法可以使用 .withColumn 创建一个新的 Map 列,合并 Spark Sql 中的这两列?

val sampleDF = Seq(
 ("Jeff", Map("key1" -> "val1"), Map("key2" -> "val2"))
).toDF("name", "mapCol1", "mapCol2")

sampleDF.show()

+----+-----------------+-----------------+
|name|          mapCol1|          mapCol2|
+----+-----------------+-----------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|
+----+-----------------+-----------------+
Run Code Online (Sandbox Code Playgroud)

Ram*_*jan 5

您可以编写一个udf函数将两列合并为一列,withColumn如下所示

import org.apache.spark.sql.functions._
def mergeUdf = udf((map1: Map[String, String], map2: Map[String, String])=> map1 ++ map2)

sampleDF.withColumn("merged", mergeUdf(col("mapCol1"), col("mapCol2"))).show(false)
Run Code Online (Sandbox Code Playgroud)

这应该给你

+----+-----------------+-----------------+-------------------------------+
|name|mapCol1          |mapCol2          |merged                         |
+----+-----------------+-----------------+-------------------------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|Map(key1 -> val1, key2 -> val2)|
+----+-----------------+-----------------+-------------------------------+
Run Code Online (Sandbox Code Playgroud)

我希望答案有帮助


mrs*_*vas 5

仅当由于性能原因您的用例没有内置函数时才使用 UDF。

Spark 2.4 及以上版本

import org.apache.spark.sql.functions.{map_concat, col}

sampleDF.withColumn("map_concat", map_concat(col("mapCol1"), col("mapCol2"))).show(false)
Run Code Online (Sandbox Code Playgroud)

输出

+----+-----------------+-----------------+-------------------------------+
|name|mapCol1          |mapCol2          |map_concat                     |
+----+-----------------+-----------------+-------------------------------+
|Jeff|Map(key1 -> val1)|Map(key2 -> val2)|Map(key1 -> val1, key2 -> val2)|
+----+-----------------+-----------------+-------------------------------+
Run Code Online (Sandbox Code Playgroud)

Spark版本2.4以下

按照@RameshMaharjan 在这个问题中的回答创建一个 UDF ,但我添加了一个空检查以避免运行时的 NPE,如果不添加,最终会使作业失败。

import org.apache.spark.sql.functions.{udf, col}

val map_concat = udf((map1: Map[String, String],
                      map2: Map[String, String]) =>
  if (map1 == null) {
    map2
  } else if (map2 == null) {
    map1
  } else {
    map1 ++ map2
  })

sampleDF.withColumn("map_concat", map_concat(col("mapCol1"), col("mapCol2")))
 .show(false)
Run Code Online (Sandbox Code Playgroud)