Hay*_*Luo -2 scala dataset hadoop-partitioning apache-spark
Dataset[_],并返回一个新列“partitionId”,该列是单个数据单元所属的分区的 ID。例如,如果我在下面有一个数据集,并且默认情况下它有两个分区。
+-----+------+
| colA| colB|
+-----+------+
| 1 | a|
| 2 | b|
| 3 | c|
+-----+------+
Run Code Online (Sandbox Code Playgroud)
函数执行后,应该是下面的结果,前两个数据单元属于同一个分区,第三个属于另一个分区。
+-----+------+------------+
| colA| colB| partitionId|
+-----+------+------------+
| 1 | a| 1|
| 2 | b| 1|
| 3 | c| 2|
+-----+------+------------+
Run Code Online (Sandbox Code Playgroud)
我尝试过 withColumn() 和 mapPartitions(),但没有一个对我有用。对于withColumn(),我无法获取数据单元所属分区的信息,例如withColumn("partitionId", {What should be here to add the partitionId?})
对于mapPartitions(),我尝试了:
+-----+------+
| colA| colB|
+-----+------+
| 1 | a|
| 2 | b|
| 3 | c|
+-----+------+
Run Code Online (Sandbox Code Playgroud)
但这仅适用于特定类型,例如Dataset[MyDataType],不适用于Dataset[_]
如何为任何数据集添加 partitionId 列?
小智 5
是否有理由需要每条记录的分区 ID?无论哪种方式,您都可以通过以下方式实现:
import org.apache.spark.sql.functions.spark_partition_id
...
dataFrame.withColumn("partitionID", spark_partition_id)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1700 次 |
| 最近记录: |