spark foreachPartition,如何获取每个分区的索引?

Dav*_*lla 6 scala apache-spark

spark foreachPartition,如何获取分区的索引(或序列号,或识别分区的东西)?

val docs: RDD[String] = ...

println("num partitions: " + docs.getNumPartitions)

docs.foreachPartition((it: Iterator[String]) => {
  println("partition index: " + ???)
  it.foreach(...)
})
Run Code Online (Sandbox Code Playgroud)

hi-*_*zir 7

您可以使用TaskContext(如何在Spark中获取地图任务的ID?):

import org.apache.spark.TaskContext

rdd.foreachPartition((it: Iterator[String]) => {
  println(TaskContext.getPartitionId)
})
Run Code Online (Sandbox Code Playgroud)


Yuv*_*kov 5

不完全相同,但您可以使用RDD.mapPartitionsWithIndex并返回Iterator[Unit]一个结果:

val rdd: RDD[Unit] = docs.mapPartitionsWithIndex { case (idx, it) => 
  println("partition index: " + ???)
  it.foreach(...)
}
Run Code Online (Sandbox Code Playgroud)

但是你必须记住实现 RDD

另一种方法是使用mapPartitionsWithIndex来执行与转换数据相关的逻辑,然后foreachRDD仅使用向外部发送数据。