Spark 数据集 withColumn 添加分区 id

Hay*_*Luo -2 scala dataset hadoop-partitioning apache-spark

我正在尝试编写一个辅助函数,该函数采用任何类型的数据集Dataset[_],并返回一个新列“partitionId”,该列是单个数据单元所属的分区的 ID。

例如,如果我在下面有一个数据集,并且默认情况下它有两个分区。

+-----+------+
| colA|  colB|
+-----+------+
|   1 |     a|
|   2 |     b|
|   3 |     c|
+-----+------+
Run Code Online (Sandbox Code Playgroud)

函数执行后,应该是下面的结果,前两个数据单元属于同一个分区,第三个属于另一个分区。

+-----+------+------------+
| colA|  colB| partitionId|
+-----+------+------------+
|   1 |     a|           1|
|   2 |     b|           1|
|   3 |     c|           2|
+-----+------+------------+
Run Code Online (Sandbox Code Playgroud)

我尝试过 withColumn() 和 mapPartitions(),但没有一个对我有用。对于withColumn(),我无法获取数据单元所属分区的信息,例如withColumn("partitionId", {What should be here to add the partitionId?}) 对于mapPartitions(),我尝试了:

+-----+------+
| colA|  colB|
+-----+------+
|   1 |     a|
|   2 |     b|
|   3 |     c|
+-----+------+
Run Code Online (Sandbox Code Playgroud)

但这仅适用于特定类型,例如Dataset[MyDataType],不适用于Dataset[_]

如何为任何数据集添加 partitionId 列?

小智 5

是否有理由需要每条记录的分区 ID?无论哪种方式,您都可以通过以下方式实现:

import org.apache.spark.sql.functions.spark_partition_id
...
dataFrame.withColumn("partitionID", spark_partition_id)
Run Code Online (Sandbox Code Playgroud)