Neh*_*eha 6 scala join apache-spark
我有3个不同进程生成的3dataframes.每个数据框都具有相同名称的列.我的数据框看起来像这样
id val1 val2 val3 val4
1 null null null null
2 A2 A21 A31 A41
id val1 val2 val3 val4
1 B1 B21 B31 B41
2 null null null null
id val1 val2 val3 val4
1 C1 C2 C3 C4
2 C11 C12 C13 C14
Run Code Online (Sandbox Code Playgroud)
在这3个数据帧中,我想创建两个数据帧(最终和合并).最后,首选项顺序 - dataFrame 1> Dataframe 2> Dataframe 3
如果数据帧1中存在结果(val1!= null),我将该行存储在最终数据帧中.
我的最终结果应该是:
id finalVal1 finalVal2 finalVal3 finalVal4
1 B1 B21 B31 B41
2 A2 A21 A31 A41
Run Code Online (Sandbox Code Playgroud)
Consolidated Dataframe将存储所有3个结果.
我怎样才能有效地做到这一点?
che*_*aux 10
如果我理解正确的话,对于你想要找出第一个非空值的每一行,首先查看第一个表,然后查看第二个表,然后查看第三个表.
您只需要基于它连接这三个表id
,然后使用该coalesce
函数来获取第一个非null元素
import org.apache.spark.sql.functions._
val df1 = sc.parallelize(Seq(
(1,null,null,null,null),
(2,"A2","A21","A31", "A41"))
).toDF("id", "val1", "val2", "val3", "val4")
val df2 = sc.parallelize(Seq(
(1,"B1","B21","B31", "B41"),
(2,null,null,null,null))
).toDF("id", "val1", "val2", "val3", "val4")
val df3 = sc.parallelize(Seq(
(1,"C1","C2","C3","C4"),
(2,"C11","C12","C13", "C14"))
).toDF("id", "val1", "val2", "val3", "val4")
val consolidated = df1.join(df2, "id").join(df3, "id").select(
df1("id"),
coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)
Run Code Online (Sandbox Code Playgroud)
这为您提供了预期的输出
+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
| 1| B1| B21| B31| B41|
| 2| A2| A21| A31| A41|
+---+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)