Am1*_*3zA 5 scala dataframe apache-spark
我有多个从不同来源读取的 spark 作业,它们具有不同的架构,但它们非常接近,我想要做的是将它们全部写入同一个 Redshift 表,因此我需要统一所有 DataFrame 架构,什么是最好的方法吗?
假设第一个输入数据的架构如下:
val schema1 = StructType(Seq(
StructField("date", DateType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("platform", StringType),
StructField("country", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType)
))
Run Code Online (Sandbox Code Playgroud)
seconf inout 源的架构如下:
val schema2 = StructType(Seq(
StructField("date", DateType),
StructField("creator_id", StringType),
StructField("creator_name", StringType),
StructField("platform", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType),
StructField("ecpm", DoubleType)
))
Run Code Online (Sandbox Code Playgroud)
表架构(预期统一数据帧):
val finalSchema = StructType(Seq(
StructField("date", DateType),
StructField("account_name", StringType),
StructField("adset_id", StringType),
StructField("adset_name", StringType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("pub_id", StringType),
StructField("pub_name", StringType),
StructField("creative_id", StringType),
StructField("creative_name", StringType),
StructField("platform", StringType),
StructField("install_source", StringType),
StructField("views", IntegerType),
StructField("clicks", IntegerType),
StructField("installs", IntegerType),
StructField("cost", DoubleType)
))
Run Code Online (Sandbox Code Playgroud)
正如您在最终架构中看到的,我有一些列可能不在输入架构中,因此它应该为空,某些列名称也应该重命名。一些列像ecpm应该被删除。
添加index columns到dataframes和join它们的基础上index,这样就会有一对一的映射。之后select只有您想要columns的joined dataframe。
如果你有两个dataframes像下面这样的
// df1.show
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
| Bob| 29|
| Tom| 26|
+-----+---+
//df2.show
+--------+-------+
| city|country|
+--------+-------+
| Delhi| India|
|New York| USA|
| London| UK|
+--------+-------+
Run Code Online (Sandbox Code Playgroud)现在添加index columns并获取一对一映射
import org.apache.spark.sql.functions._
val df1Index=df1.withColumn("index1",monotonicallyIncreasingId)
val df2Index=df2.withColumn("index2",monotonicallyIncreasingId)
val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2"))
//joinedDf
+-----+---+------+--------+-------+------+
| name|age|index1| city|country|index2|
+-----+---+------+--------+-------+------+
|Alice| 25| 0| Delhi| India| 0|
| Bob| 29| 1|New York| USA| 1|
| Tom| 26| 2| London| UK| 2|
+-----+---+------+--------+-------+------+
Run Code Online (Sandbox Code Playgroud)现在您可以像下面这样编写查询
val queryList=List(col("name"),col("age"),col("country"))
joinedDf.select(queryList:_*).show
//Output df
+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 25| India|
| Bob| 29| USA|
| Tom| 26| UK|
+-----+---+-------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1729 次 |
| 最近记录: |