Dav*_*lla 6 scala apache-spark
spark foreachPartition,如何获取分区的索引(或序列号,或识别分区的东西)?
val docs: RDD[String] = ...
println("num partitions: " + docs.getNumPartitions)
docs.foreachPartition((it: Iterator[String]) => {
println("partition index: " + ???)
it.foreach(...)
})
Run Code Online (Sandbox Code Playgroud)
您可以使用TaskContext(如何在Spark中获取地图任务的ID?):
import org.apache.spark.TaskContext
rdd.foreachPartition((it: Iterator[String]) => {
println(TaskContext.getPartitionId)
})
Run Code Online (Sandbox Code Playgroud)
不完全相同,但您可以使用RDD.mapPartitionsWithIndex并返回Iterator[Unit]一个结果:
val rdd: RDD[Unit] = docs.mapPartitionsWithIndex { case (idx, it) =>
println("partition index: " + ???)
it.foreach(...)
}
Run Code Online (Sandbox Code Playgroud)
但是你必须记住实现 RDD
另一种方法是使用mapPartitionsWithIndex来执行与转换数据相关的逻辑,然后foreachRDD仅使用向外部发送数据。