Apache Spark:获取每个分区的记录数

nil*_*212 13 hadoop scala partitioning apache-spark apache-spark-sql

我想查看如何获取有关每个分区的信息,例如总数.当使用部署模式作为纱线群集提交Spark作业以便在控制台上记录或打印时,驱动程序端的每个分区中的记录数.

hi-*_*zir 18

我会使用内置功能.它应该尽可能高效:

import org.apache.spark.sql.functions.spark_partition_id

df.groupBy(spark_partition_id).count
Run Code Online (Sandbox Code Playgroud)

  • 您可以使用`df.withColumn("partition_id",spark_partition_id).groupBy("partition_id").count` for 1.6 (3认同)

Rap*_*oth 17

你可以得到每个分区的记录数,如下所示:

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show
Run Code Online (Sandbox Code Playgroud)

但是这也会自动启动Spark Job(因为必须通过spark读取文件才能获得记录数).

Spark也可能会读取hive表统计信息,但我不知道如何显示这些元数据.


Bis*_*oyM 13

对于未来的 PySpark 用户:

from pyspark.sql.functions  import spark_partition_id
rawDf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()
Run Code Online (Sandbox Code Playgroud)


Tag*_*gar 5

火花/斯卡拉:

val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect()  # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length)  # check if skewed
Run Code Online (Sandbox Code Playgroud)

派斯帕克:

num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect()  # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l))  # check if skewed
Run Code Online (Sandbox Code Playgroud)

同样的情况也适用于 a dataframe,而不仅仅是 an RDD。只需将DF.rdd.glom... 添加到上面的代码中即可。

致谢:Mike Dusenberry @ https://issues.apache.org/jira/browse/SPARK-17817