如何使用functions.explode扁平化dataFrame中的元素

Rob*_*lds 0 scala apache-spark

我已经制作了这段代码:

case class RawPanda(id: Long, zip: String, pt: String, happy: Boolean, attributes: Array[Double])
case class PandaPlace(name: String, pandas: Array[RawPanda])

object TestSparkDataFrame extends App{

  System.setProperty("hadoop.home.dir", "E:\\Programmation\\Libraries\\hadoop")
  val conf = new SparkConf().setAppName("TestSparkDataFrame").set("spark.driver.memory","4g").setMaster("local[*]")
  val session = SparkSession.builder().config(conf).getOrCreate()

  import session.implicits._

  def createAndPrintSchemaRawPanda(session:SparkSession):DataFrame = {
    val newPanda = RawPanda(1,"M1B 5K7", "giant", true, Array(0.1, 0.1))
    val pandaPlace = PandaPlace("torronto", Array(newPanda))
    val df =session.createDataFrame(Seq(pandaPlace))
    df
  }
  val df2 = createAndPrintSchemaRawPanda(session)
  df2.show

+--------+--------------------+
|    name|              pandas|
+--------+--------------------+
|torronto|[[1,M1B 5K7,giant...|
+--------+--------------------+


  val pandaInfo = df2.explode(df2("pandas")) {
    case Row(pandas: Seq[Row]) =>
      pandas.map{
        case (Row(
          id: Long,
          zip: String,
          pt: String,
          happy: Boolean,
          attrs: Seq[Double])) => RawPanda(id, zip, pt , happy,      attrs.toArray)
      }
  }

  pandaInfo2.show

+--------+--------------------+---+-------+-----+-----+----------+
|    name|              pandas| id|    zip|   pt|happy|attributes|
+--------+--------------------+---+-------+-----+-----+----------+
|torronto|[[1,M1B 5K7,giant...|  1|M1B 5K7|giant| true|[0.1, 0.1]|
+--------+--------------------+---+-------+-----+-----+----------+
Run Code Online (Sandbox Code Playgroud)

我使用的爆炸函数已被弃用的问题,所以我想重新计算 pandaInfo2 数据帧,但使用警告中的建议方法。

将 flatMap() 或 select() 与 functions.explode() 一起使用

但是当我这样做时:

 val pandaInfo = df2.select(functions.explode(df("pandas"))
Run Code Online (Sandbox Code Playgroud)

我得到的结果与我在 df2 中得到的结果相同。我不知道如何继续使用 flatMap 或 functions.explode。

我如何使用 flatMap 或 functions.explode 来获得我想要的结果?(pandaInfo 中的那个)

我看过这篇文章另一篇文章,但没有一个对我有帮助。

Tza*_*har 5

select使用explode函数调用返回一个 DataFrame,其中 Arraypandas被“分解”为单独的记录;然后,如果您想“展平”每条记录生成的单个“RawPanda ”的结构,您可以使用点分隔的“路由”选择各个列:

val pandaInfo2 = df2.select($"name", explode($"pandas") as "pandas")
  .select($"name", $"pandas",
    $"pandas.id" as "id",
    $"pandas.zip" as "zip",
    $"pandas.pt" as "pt",
    $"pandas.happy" as "happy",
    $"pandas.attributes" as "attributes"
  )
Run Code Online (Sandbox Code Playgroud)

完全相同操作的一个不太详细的版本是:

import org.apache.spark.sql.Encoders // going to use this to "encode" case class into schema
val pandaColumns = Encoders.product[RawPanda].schema.fields.map(_.name)

val pandaInfo3 = df2.select($"name", explode($"pandas") as "pandas")
  .select(Seq($"name", $"pandas") ++ pandaColumns.map(f => $"pandas.$f" as f): _*)
Run Code Online (Sandbox Code Playgroud)