获取DataFrame的当前分区数

kec*_*cso 55 dataframe apache-spark apache-spark-sql

有没有办法获得DataFrame的当前分区数?我检查了DataFrame javadoc(spark 1.6)并没有找到方法,或者我只是错过了它?(在JavaRDD的情况下,有一个getNumPartitions()方法.)

use*_*931 108

您需要调用getNumPartitions()DataFrame的底层RDD,例如df.rdd.getNumPartitions().在Scala的情况下,这是一个无参数的方法:df.rdd.getNumPartitions.

  • 这很贵 (5认同)
  • 这是否会导致从`DF` 到`RDD` 的*转换*(_昂贵的_)? (3认同)
  • 减去(),因此并不完全正确-至少不是在SCALA模式下 (2认同)

Ram*_*ram 14

dataframe.rdd.partitions.sizedf.rdd.getNumPartitions()或以外的另一种选择df.rdd.length

让我用完整的例子向您解释一下...

val x = (1 to 10).toList
val numberDF = x.toDF(“number”)
numberDF.rdd.partitions.size // => 4
Run Code Online (Sandbox Code Playgroud)

为了证明上面有多少个分区...将数据帧另存为csv

numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)
Run Code Online (Sandbox Code Playgroud)

这是在不同分区上分离数据的方式。

Partition 00000: 1, 2
Partition 00001: 3, 4, 5
Partition 00002: 6, 7
Partition 00003: 8, 9, 10
Run Code Online (Sandbox Code Playgroud)

更新:

@Hemanth在评论中问了一个很好的问题...基本上在上述情况下为什么分区数为4

简短答案:取决于您执行的情况。自从我使用local [4]以来,我得到了4个分区。

长答案:

我在本地计算机上运行上述程序,并根据其作为4个分区,将master用作local [4] 。

val spark = SparkSession.builder()
    .appName(this.getClass.getName)
    .config("spark.master", "local[4]").getOrCreate()
Run Code Online (Sandbox Code Playgroud)

如果它的火花壳在母纱中,我得到的分隔数为2

示例:spark-shell --master yarn并再次键入相同的命令

scala> val x = (1 to 10).toList
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


scala> val numberDF = x.toDF("number")
numberDF: org.apache.spark.sql.DataFrame = [number: int]

scala> numberDF.rdd.partitions.size
res0: Int = 2
Run Code Online (Sandbox Code Playgroud)
  • 这里2是spark的默认并行
  • 基于hashpartitioner,spark将决定要分配多少个分区。如果您正在运行--master local并基于您的Runtime.getRuntime.availableProcessors() ie local[Runtime.getRuntime.availableProcessors()],它将尝试分配那些分区数。如果可用的处理器数量为12(即local[Runtime.getRuntime.availableProcessors()]),您有1到10的列表),那么将仅创建10个分区。

注意:

如果您使用的是我正在执行Spark程序的12核笔记本电脑,并且默认情况下,分区/任务的数量是所有可用核的数量,即12。这意味着local[*]s"local[${Runtime.getRuntime.availableProcessors()}]")在这种情况下,只有10个数字,因此它将限制至10

请牢记所有这些指示,我建议您自己尝试


小智 7

转换为RDD然后获取分区长度

DF.rdd.partitions.length
Run Code Online (Sandbox Code Playgroud)


lon*_*tar 6

 val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

df.rdd.getNumPartitions
Run Code Online (Sandbox Code Playgroud)