Rob*_*lds 0 scala apache-spark
我已经制作了这段代码:
case class RawPanda(id: Long, zip: String, pt: String, happy: Boolean, attributes: Array[Double])
case class PandaPlace(name: String, pandas: Array[RawPanda])
object TestSparkDataFrame extends App{
System.setProperty("hadoop.home.dir", "E:\\Programmation\\Libraries\\hadoop")
val conf = new SparkConf().setAppName("TestSparkDataFrame").set("spark.driver.memory","4g").setMaster("local[*]")
val session = SparkSession.builder().config(conf).getOrCreate()
import session.implicits._
def createAndPrintSchemaRawPanda(session:SparkSession):DataFrame = {
val newPanda = RawPanda(1,"M1B 5K7", "giant", true, Array(0.1, 0.1))
val pandaPlace = PandaPlace("torronto", Array(newPanda))
val df =session.createDataFrame(Seq(pandaPlace))
df
}
val df2 = createAndPrintSchemaRawPanda(session)
df2.show
+--------+--------------------+
| name| pandas|
+--------+--------------------+
|torronto|[[1,M1B 5K7,giant...|
+--------+--------------------+
val pandaInfo = df2.explode(df2("pandas")) {
case Row(pandas: Seq[Row]) =>
pandas.map{
case (Row(
id: Long,
zip: String,
pt: String,
happy: Boolean,
attrs: Seq[Double])) => RawPanda(id, zip, pt , happy, attrs.toArray)
}
}
pandaInfo2.show
+--------+--------------------+---+-------+-----+-----+----------+
| name| pandas| id| zip| pt|happy|attributes|
+--------+--------------------+---+-------+-----+-----+----------+
|torronto|[[1,M1B 5K7,giant...| 1|M1B 5K7|giant| true|[0.1, 0.1]|
+--------+--------------------+---+-------+-----+-----+----------+
Run Code Online (Sandbox Code Playgroud)
我使用的爆炸函数已被弃用的问题,所以我想重新计算 pandaInfo2 数据帧,但使用警告中的建议方法。
将 flatMap() 或 select() 与 functions.explode() 一起使用
但是当我这样做时:
val pandaInfo = df2.select(functions.explode(df("pandas"))
Run Code Online (Sandbox Code Playgroud)
我得到的结果与我在 df2 中得到的结果相同。我不知道如何继续使用 flatMap 或 functions.explode。
我如何使用 flatMap 或 functions.explode 来获得我想要的结果?(pandaInfo 中的那个)
select使用explode函数调用返回一个 DataFrame,其中 Arraypandas被“分解”为单独的记录;然后,如果您想“展平”每条记录生成的单个“RawPanda ”的结构,您可以使用点分隔的“路由”选择各个列:
val pandaInfo2 = df2.select($"name", explode($"pandas") as "pandas")
.select($"name", $"pandas",
$"pandas.id" as "id",
$"pandas.zip" as "zip",
$"pandas.pt" as "pt",
$"pandas.happy" as "happy",
$"pandas.attributes" as "attributes"
)
Run Code Online (Sandbox Code Playgroud)
完全相同操作的一个不太详细的版本是:
import org.apache.spark.sql.Encoders // going to use this to "encode" case class into schema
val pandaColumns = Encoders.product[RawPanda].schema.fields.map(_.name)
val pandaInfo3 = df2.select($"name", explode($"pandas") as "pandas")
.select(Seq($"name", $"pandas") ++ pandaColumns.map(f => $"pandas.$f" as f): _*)
Run Code Online (Sandbox Code Playgroud)