在 SPARK 中将多列合并为单列

sas*_*hmi 2 apache-spark apache-spark-sql databricks

我的镶木地板文件中有以下格式的扁平化传入数据:

在此输入图像描述

我想将其转换为以下格式,其中我不展平我的结构:

在此输入图像描述

我尝试了以下方法:

Dataset<Row> rows = df.select(col("id"), col("country_cd"),
                explode(array("fullname_1", "fullname_2")).as("fullname"),
                explode(array("firstname_1", "firstname_2")).as("firstname"));
Run Code Online (Sandbox Code Playgroud)

但它给出了以下错误:

线程“main”org.apache.spark.sql.AnalysisException 中出现异常:每个 select 子句仅允许一个生成器,但发现 2 个:explode(array(fullname_1, fullname_2)),explode(array(firstname_1,firstname_2));

我理解这是因为你不能在查询中使用超过 1 个爆炸。我正在寻找在 Spark Java 中执行上述操作的选项。

Tra*_*ner 5

此类问题最容易通过.flatMap(). A.flatMap()与 a 类似,.map()不同之处在于它允许您为每个输入记录输出 n 条记录,而不是 1:1 的比率。

val df = Seq(
    (1, "USA", "Lee M", "Lee", "Dan A White", "Dan"),
    (2, "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
    ).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")

df.flatMap(row => {
    val id = row.getAs[Int]("id")
    val cc = row.getAs[String]("country_code")
    Seq(
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1")),
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1"))
    )
}).toDF("id", "country_code", "fullname", "firstname").show()
Run Code Online (Sandbox Code Playgroud)

结果如下:

+---+------------+-----------+---------+
| id|country_code|   fullname|firstname|
+---+------------+-----------+---------+
|  1|         USA|      Lee M|      Lee|
|  1|         USA|      Lee M|      Lee|
|  2|         CAN|Pate Poland|     Pate|
|  2|         CAN|Pate Poland|     Pate|
+---+------------+-----------+---------+
Run Code Online (Sandbox Code Playgroud)