Kyl*_*ton 5 scala apache-spark apache-spark-sql
我有一个Scala spark DataFrame:
df.select($"row_id", $"array_of_data").show
+----------+--------------------+
| row_id | array_of_data |
+----------+--------------------+
| 0 | [123, ABC, G12] |
| 1 | [100, 410] |
| 2 | [500, 300, ...] |
Run Code Online (Sandbox Code Playgroud)
我想爆炸这些数组,以便每个元素都在不同的行中,但我还想标记哪一行对应于数组的第一个元素:
+----------+--------------------+----------+----------+
| row_id | array_of_data | exploded | is_first |
+----------+--------------------+----------+----------+
| 0 | [123, ABC, G12] | 123 | Yes |
| 0 | [123, ABC, G12] | ABC | No |
| 0 | [123, ABC, G12] | G12 | No |
Run Code Online (Sandbox Code Playgroud)
为实现这一点,我使用了explode函数,并希望第一行对应第一个数据元素:
var exploded_df = df.withColumn("exploded", explode($"array_of_data"))
val window = Window.partitionBy("row_id").orderBy("row_id")
// Create an internal rank variable to figure out the first element
exploded_df = exploded_df.withColumn("_rank", row_number().over(window))
exploded_df = exploded_df.withColumn("is_first",
when(($"_rank" === 1), "Yes").otherwise("No")
)
Run Code Online (Sandbox Code Playgroud)
这似乎适用于我的目的并产生所需的输出,但我相信这将始终有效吗?我无法在爆炸文档中找到承诺此行为的任何地方,并且信任Spark数据帧中的行顺序似乎是不明智的.
我能想到的唯一其他解决方案是为每个元素创建一个新列array_of_data,然后匹配exploded等于第一列中的值,但我不保证数组中不会有重复值.
为此,可以使用posexplode函数。
正如api文档所解释的那样
为每个元素在给定数组或地图列中的位置创建一个新行。
您可以使用selectfunction,以便position和爆炸列形成单独的列,如下所示:
import org.apache.spark.sql.functions._
df.select($"row_id", posexplode($"array_of_data")).show(false)
Run Code Online (Sandbox Code Playgroud)
这应该给你
+------+---------------+---+---+
|row_id|array_of_data |pos|col|
+------+---------------+---+---+
|0 |[123, ABC, G12]|0 |123|
|0 |[123, ABC, G12]|1 |ABC|
|0 |[123, ABC, G12]|2 |G12|
+------+---------------+---+---+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1931 次 |
| 最近记录: |