选择DataFrame中数组的最后一个元素

fle*_*chr 1 scala apache-spark apache-spark-sql

我正在一个项目上,正在处理具有复杂模式/数据结构的一些嵌套JSON日期。基本上,我想做的是在数据框中过滤掉其中的一列,以便选择数组中的最后一个元素。我完全坚持如何做到这一点。我希望这是有道理的。

以下是我要完成的示例:

val singersDF = Seq(
  ("beatles", "help,hey,jude"),
  ("romeo", "eres,mia"),
  ("elvis", "this,is,an,example")
).toDF("name", "hit_songs")

val actualDF = singersDF.withColumn(
  "hit_songs",
  split(col("hit_songs"), "\\,")
)

actualDF.show(false)
actualDF.printSchema() 

+-------+-----------------------+
|name   |hit_songs              |
+-------+-----------------------+
|beatles|[help, hey, jude]      |
|romeo  |[eres, mia]            |
|elvis  |[this, is, an, example]|
+-------+-----------------------+
root
 |-- name: string (nullable = true)
 |-- hit_songs: array (nullable = true)
 |    |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)

输出的最终目标将是以下内容,以选择hit_songs数组中的最后一个“字符串”。

我不担心之后的架构是什么样的。

+-------+---------+
|name   |hit_songs|
+-------+---------+
|beatles|jude     |
|romeo  |mia      |
|elvis  |example  |
+-------+---------+
Run Code Online (Sandbox Code Playgroud)

Moh*_*OUI 9

spark 2.4+开始,您可以使用支持负索引的element_at。正如您在本文档引用中所看到的:

element_at(array, index) - 返回给定(从 1 开始)索引处的数组元素。如果索引 < 0,则访问从最后一个到第一个的元素。如果索引超过数组长度,则返回 NULL。

这样,获取最后一个元素的方法如下:

import org.apache.spark.sql.functions.element_at
actualDF.withColumn("hit_songs", element_at($"hit_songs", -1))
Run Code Online (Sandbox Code Playgroud)

可重现的例子:

首先让我们准备一个带有数组列的示例数据框:

val columns = Seq("col1")
val data = Seq((Array(1,2,3)))
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF(columns:_*)
Run Code Online (Sandbox Code Playgroud)

看起来像这样:

scala> df.show()
+---------+
|     col1|
+---------+
|[1, 2, 3]|
+---------+
Run Code Online (Sandbox Code Playgroud)

然后,应用element_at获取最后一个元素,如下所示:

scala> df.withColumn("last_value", element_at($"col1", -1)).show()
+---------+----------+
|     col1|last_value|
+---------+----------+
|[1, 2, 3]|         3|
+---------+----------+
Run Code Online (Sandbox Code Playgroud)


Tza*_*har 8

您可以使用该size函数计算数组中所需项目的索引,然后将其作为Column.apply(显式或隐式)的参数传递:

import org.apache.spark.sql.functions._
import spark.implicits._

actualDF.withColumn("hit_songs", $"hit_songs".apply(size($"hit_songs").minus(1)))
Run Code Online (Sandbox Code Playgroud)

要么:

actualDF.withColumn("hit_songs", $"hit_songs"(size($"hit_songs").minus(1)))
Run Code Online (Sandbox Code Playgroud)


Leo*_*o C 5

这是一种方法:

val actualDF = Seq(
  ("beatles", Seq("help", "hey", "jude")),
  ("romeo", Seq("eres", "mia")),
  ("elvis", Seq("this", "is", "an", "example"))
).toDF("name", "hit_songs")

import org.apache.spark.sql.functions._

actualDF.withColumn("total_songs", size($"hit_songs")).
  select($"name", $"hit_songs"($"total_songs" - 1).as("last_song"))
// +-------+---------+
// |   name|last_song|
// +-------+---------+
// |beatles|     jude|
// |  romeo|      mia|
// |  elvis|  example|
// +-------+---------+
Run Code Online (Sandbox Code Playgroud)