如何获取分区中的元素数量?

Geo*_*Geo 14 partitioning apache-spark

在给定分区ID的情况下,有没有办法获得spark RDD分区中的元素数量?不扫描整个分区.

像这样的东西:

Rdd.partitions().get(index).size()
Run Code Online (Sandbox Code Playgroud)

除了我没有看到这样的火花API.有任何想法吗?解决方法?

谢谢

pze*_*vic 25

下面给出了一个新的RDD,其元素是每个分区的大小:

rdd.mapPartitions(iter => Array(iter.size).iterator, true) 
Run Code Online (Sandbox Code Playgroud)


Tag*_*gar 19

PySpark:

num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect()  # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l))  # check if skewed
Run Code Online (Sandbox Code Playgroud)

星火/斯卡拉:

val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect()  # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length)  # check if skewed
Run Code Online (Sandbox Code Playgroud)

致谢:Mike Dusenberry @ https://issues.apache.org/jira/browse/SPARK-17817

数据帧也是如此,而不仅仅是RDD.只需将DF.rdd.glom ...添加到上面的代码中即可.


Roh*_*ala 5

我知道我有点晚了,但我有另一种方法通过利用 Spark 的内置函数来获取分区中的元素数量。适用于 Spark 2.1 以上版本。

说明:我们将创建一个示例数据帧 (df),获取分区 id,对分区 id 进行分组,并对每条记录进行计数。

皮斯帕克:

>>> from pyspark.sql.functions import spark_partition_id, count as _count
>>> df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
>>> df.rdd.getNumPartitions()
4
>>> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(_count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
|           0|        48|
|           1|        44|
|           2|        32|
|           3|        48|
+------------+----------+
Run Code Online (Sandbox Code Playgroud)

斯卡拉:

scala> val df = spark.sql("set -v").unionAll(spark.sql("set -v")).repartition(4)
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [key: string, value: string ... 1 more field]

scala> df.rdd.getNumPartitions
res0: Int = 4

scala> df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(count("key")).orderBy("partition_id").show()
+------------+----------+
|partition_id|count(key)|
+------------+----------+
|           0|        48|
|           1|        44|
|           2|        32|
|           3|        48|
+------------+----------+
Run Code Online (Sandbox Code Playgroud)