fle*_*chr 1 scala apache-spark apache-spark-sql
我正在一个项目上,正在处理具有复杂模式/数据结构的一些嵌套JSON日期。基本上,我想做的是在数据框中过滤掉其中的一列,以便选择数组中的最后一个元素。我完全坚持如何做到这一点。我希望这是有道理的。
以下是我要完成的示例:
val singersDF = Seq(
("beatles", "help,hey,jude"),
("romeo", "eres,mia"),
("elvis", "this,is,an,example")
).toDF("name", "hit_songs")
val actualDF = singersDF.withColumn(
"hit_songs",
split(col("hit_songs"), "\\,")
)
actualDF.show(false)
actualDF.printSchema()
+-------+-----------------------+
|name |hit_songs |
+-------+-----------------------+
|beatles|[help, hey, jude] |
|romeo |[eres, mia] |
|elvis |[this, is, an, example]|
+-------+-----------------------+
root
|-- name: string (nullable = true)
|-- hit_songs: array (nullable = true)
| |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
输出的最终目标将是以下内容,以选择hit_songs数组中的最后一个“字符串”。
我不担心之后的架构是什么样的。
+-------+---------+
|name |hit_songs|
+-------+---------+
|beatles|jude |
|romeo |mia |
|elvis |example |
+-------+---------+
Run Code Online (Sandbox Code Playgroud)
从spark 2.4+开始,您可以使用支持负索引的element_at。正如您在本文档引用中所看到的:
element_at(array, index) - 返回给定(从 1 开始)索引处的数组元素。如果索引 < 0,则访问从最后一个到第一个的元素。如果索引超过数组长度,则返回 NULL。
这样,获取最后一个元素的方法如下:
import org.apache.spark.sql.functions.element_at
actualDF.withColumn("hit_songs", element_at($"hit_songs", -1))
Run Code Online (Sandbox Code Playgroud)
首先让我们准备一个带有数组列的示例数据框:
val columns = Seq("col1")
val data = Seq((Array(1,2,3)))
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF(columns:_*)
Run Code Online (Sandbox Code Playgroud)
看起来像这样:
scala> df.show()
+---------+
| col1|
+---------+
|[1, 2, 3]|
+---------+
Run Code Online (Sandbox Code Playgroud)
然后,应用element_at获取最后一个元素,如下所示:
scala> df.withColumn("last_value", element_at($"col1", -1)).show()
+---------+----------+
| col1|last_value|
+---------+----------+
|[1, 2, 3]| 3|
+---------+----------+
Run Code Online (Sandbox Code Playgroud)
您可以使用该size函数计算数组中所需项目的索引,然后将其作为Column.apply(显式或隐式)的参数传递:
import org.apache.spark.sql.functions._
import spark.implicits._
actualDF.withColumn("hit_songs", $"hit_songs".apply(size($"hit_songs").minus(1)))
Run Code Online (Sandbox Code Playgroud)
要么:
actualDF.withColumn("hit_songs", $"hit_songs"(size($"hit_songs").minus(1)))
Run Code Online (Sandbox Code Playgroud)
这是一种方法:
val actualDF = Seq(
("beatles", Seq("help", "hey", "jude")),
("romeo", Seq("eres", "mia")),
("elvis", Seq("this", "is", "an", "example"))
).toDF("name", "hit_songs")
import org.apache.spark.sql.functions._
actualDF.withColumn("total_songs", size($"hit_songs")).
select($"name", $"hit_songs"($"total_songs" - 1).as("last_song"))
// +-------+---------+
// | name|last_song|
// +-------+---------+
// |beatles| jude|
// | romeo| mia|
// | elvis| example|
// +-------+---------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2184 次 |
| 最近记录: |