在spark中执行多个DataFrame的连接

Neh*_*eha 6 scala join apache-spark

我有3个不同进程生成的3dataframes.每个数据框都具有相同名称的列.我的数据框看起来像这样

id   val1    val2       val3    val4
 1    null   null       null    null
 2    A2      A21       A31      A41

id   val1      val2       val3      val4
 1    B1        B21        B31       B41
 2    null      null       null      null

id   val1     val2       val3    val4
 1    C1       C2        C3       C4
 2    C11      C12       C13      C14
Run Code Online (Sandbox Code Playgroud)

在这3个数据帧中,我想创建两个数据帧(最终和合并).最后,首选项顺序 - dataFrame 1> Dataframe 2> Dataframe 3

如果数据帧1中存在结果(val1!= null),我将该行存储在最终数据帧中.

我的最终结果应该是:

id  finalVal1    finalVal2   finalVal3   finalVal4 
1     B1           B21         B31         B41
2     A2           A21         A31         A41
Run Code Online (Sandbox Code Playgroud)

Consolidated Dataframe将存储所有3个结果.

我怎样才能有效地做到这一点?

che*_*aux 10

如果我理解正确的话,对于你想要找出第一个非空值的每一行,首先查看第一个表,然后查看第二个表,然后查看第三个表.

您只需要基于它连接这三个表id,然后使用该coalesce函数来获取第一个非null元素

import org.apache.spark.sql.functions._

val df1 = sc.parallelize(Seq(
    (1,null,null,null,null),
    (2,"A2","A21","A31", "A41"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df2 = sc.parallelize(Seq(
    (1,"B1","B21","B31", "B41"),
    (2,null,null,null,null))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df3 = sc.parallelize(Seq(
    (1,"C1","C2","C3","C4"),
    (2,"C11","C12","C13", "C14"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val consolidated = df1.join(df2, "id").join(df3, "id").select(
  df1("id"),
  coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
  coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
  coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
  coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)
Run Code Online (Sandbox Code Playgroud)

这为您提供了预期的输出

+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1| B21| B31| B41|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)