默认情况下,Spark Dataframe 是如何分区的?

Met*_*est 2 apache-spark rdd apache-spark-sql

我知道使用 HashPartitioner 根据键值对 RDD 进行分区。但是 Spark Dataframe 默认是如何分区的,因为它没有键/值的概念。

mik*_*ike 6

Dataframe 的分区取决于运行以创建它的任务数量。

没有应用“默认”分区逻辑。以下是如何设置分区的一些示例:

  • 通过创建的 Dataframeval df = Seq(1 to 500000: _*).toDF()将只有一个分区。
  • 通过创建的 Dataframeval df = spark.range(0,100).toDF()具有与可用内核数量一样多的分区(例如,当您的 master 设置为 4 时local[4])。另外,请参阅下面关于“默认并行性”的评论,该功能对parallelize没有父 RDD 的操作生效。
  • 从 RDD ( spark.createDataFrame(rdd, schema))派生的 Dataframe将具有与底层 RDD 相同数量的分区。就我而言,由于我在本地有 6 个内核,因此 RDD 由 6 个分区创建。
  • 从 Kafka 主题消费的 Dataframe 将具有与主题的分区匹配的分区数量,因为它可以使用与主题有分区一样多的内核/插槽来消费主题。
  • 通过从 HDFS 等读取文件创建的数据帧将具有与文件匹配的分区数量,除非必须将单个文件拆分为多个分区spark.sql.files.maxPartitionBytes,默认为 128MB。
  • 从需要 shuffle 的转换派生的 Dataframe 将具有设置的可配置分区数量spark.sql.shuffle.partitions(默认为 200)。
  • ...

RDD 和结构化 API 之间的主要区别之一是,您对分区的控制不如 RDD 多,您甚至可以定义自定义分区器。这对于 Dataframes 是不可能的。

默认并行度

执行行为配置的文档spark.default.parallelism解释了:

对于没有父 RDD 的并行化等操作,它取决于集群管理器:

本地模式:本地机器上的内核数

Mesos细粒度模式:8

其他:所有执行器节点上的内核总数或 2 个,以较大者为准