将 Spark DataFrame 模式转换为新模式

Am1*_*3zA 5 scala dataframe apache-spark

我有多个从不同来源读取的 spark 作业,它们具有不同的架构,但它们非常接近,我想要做的是将它们全部写入同一个 Redshift 表,因此我需要统一所有 DataFrame 架构,什么是最好的方法吗?

假设第一个输入数据的架构如下:

  val schema1 = StructType(Seq(
    StructField("date", DateType),
    StructField("campaign_id", StringType),
    StructField("campaign_name", StringType),
    StructField("platform", StringType),
    StructField("country", StringType),
    StructField("views", DoubleType),
    StructField("installs", DoubleType),
    StructField("spend", DoubleType)
  ))
Run Code Online (Sandbox Code Playgroud)

seconf inout 源的架构如下:

  val schema2 = StructType(Seq(
    StructField("date", DateType),
    StructField("creator_id", StringType),
    StructField("creator_name", StringType),
    StructField("platform", StringType),
    StructField("views", DoubleType),
    StructField("installs", DoubleType),
    StructField("spend", DoubleType),
    StructField("ecpm", DoubleType)
  ))
Run Code Online (Sandbox Code Playgroud)

表架构(预期统一数据帧):

  val finalSchema = StructType(Seq(
    StructField("date", DateType),
    StructField("account_name", StringType),
    StructField("adset_id", StringType),
    StructField("adset_name", StringType),
    StructField("campaign_id", StringType),
    StructField("campaign_name", StringType),
    StructField("pub_id", StringType),
    StructField("pub_name", StringType),
    StructField("creative_id", StringType),
    StructField("creative_name", StringType),
    StructField("platform", StringType),
    StructField("install_source", StringType),
    StructField("views", IntegerType),
    StructField("clicks", IntegerType),
    StructField("installs", IntegerType),
    StructField("cost", DoubleType)
  ))
Run Code Online (Sandbox Code Playgroud)

正如您在最终架构中看到的,我有一些列可能不在输入架构中,因此它应该为空,某些列名称也应该重命名。一些列像ecpm应该被删除。

Man*_*akd 0

添加index columnsdataframesjoin它们的基础上index,这样就会有一对一的映射。之后select只有您想要columnsjoined dataframe

  1. 如果你有两个dataframes像下面这样的

    // df1.show
    +-----+---+
    | name|age|
    +-----+---+
    |Alice| 25|
    |  Bob| 29|
    |  Tom| 26|
    +-----+---+
    
    //df2.show
    +--------+-------+
    |    city|country|
    +--------+-------+
    |   Delhi|  India|
    |New York|    USA|
    |  London|     UK|
    +--------+-------+
    
    Run Code Online (Sandbox Code Playgroud)
  2. 现在添加index columns并获取一对一映射

    import org.apache.spark.sql.functions._
    
    val df1Index=df1.withColumn("index1",monotonicallyIncreasingId)
    
    val df2Index=df2.withColumn("index2",monotonicallyIncreasingId)
    
    val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2"))
    
    //joinedDf
    
    +-----+---+------+--------+-------+------+
    | name|age|index1|    city|country|index2|
    +-----+---+------+--------+-------+------+
    |Alice| 25|     0|   Delhi|  India|     0|
    |  Bob| 29|     1|New York|    USA|     1|
    |  Tom| 26|     2|  London|     UK|     2|
    +-----+---+------+--------+-------+------+
    
    Run Code Online (Sandbox Code Playgroud)

现在您可以像下面这样编写查询

val queryList=List(col("name"),col("age"),col("country"))
joinedDf.select(queryList:_*).show

//Output df
+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 25|  India|
|  Bob| 29|    USA|
|  Tom| 26|     UK|
+-----+---+-------+
Run Code Online (Sandbox Code Playgroud)