Spark分区(ing)如何处理HDFS中的文件?

Deg*_*get 47 hdfs apache-spark

我正在使用HDFS在集群上使用Apache Spark.据我了解,HDFS正在数据节点上分发文件.因此,如果在文件系统上放置"file.txt",它将被拆分为分区.现在我在打电话

rdd = SparkContext().textFile("hdfs://.../file.txt") 
Run Code Online (Sandbox Code Playgroud)

来自Apache Spark.rdd现在自动与文件系统上的"file.txt"分区相同吗?我打电话时会发生什么

rdd.repartition(x)
Run Code Online (Sandbox Code Playgroud)

其中x>那么hdfs使用的分区?Spark会在物理上重新排列hdfs上的数据以在本地工作吗?

示例:我在HDFS系统上放置了一个30GB的文本文件,它将它分发到10个节点上.Will Spark a)使用相同的10个分区吗?和b)当我调用重新分区(1000)时,在群集中洗牌30GB?

0x0*_*FFF 84

当Spark从HDFS读取文件时,它会为单个输入拆分创建单个分区.输入拆分由InputFormat用于读取此文件的Hadoop设置.例如,如果您使用textFile()它将TextInputFormat在Hadoop中,它将为单个HDFS块返回单个分区(但分区之间的分割将在线分割,而不是精确的块分割),除非您有压缩文本文件.如果是压缩文件,您将获得单个文件的单个分区(因为压缩文本文件不可拆分).

当你调用rdd.repartition(x)它时,它会执行从N你所拥有的分区rddx你想要的分区的数据的混乱,分区将在循环的基础上进行.

如果你有一个30GB的未压缩文本文件存储在HDFS上,那么使用默认的HDFS块大小设置(128MB)它将存储在235个块中,这意味着你从这个文件读取的RDD将有235个分区.当你调用repartition(1000)你的RDD时会被标记为重新分区,但实际上只有在你将在这个RDD(懒惰执行概念)之上执行一个动作时它才会被拖拽到1000个分区

  • 不完全是.理想情况下,您将获得与HDFS中相同数量的块.但是如果文件中的行太长(比块大小长),则分区数量会更小.在这种情况下,更改分区数的首选方法是直接将其传递给调用`rdd = SparkContext().textFile("hdfs://.../file.txt",400)`,其中400是分区数量.这种情况下,400分割的分区将由Hadoop TextInputFormat完成,而不是Spark,它可以更快地工作.Spark`repartition()`会在整个集群中对数据进行洗牌,效率不高 (12认同)
  • 那么,我是否正确地从中获取`rdd = SparkContext().textFile("hdfs://.../file.txt")`将导致RDD被划分为与块相同数量的块文件存储在HDFS中?如果你修复它`rdd.repartition(x)`,它只能进入更多的分区? (2认同)

mrs*_*vas 11

这里是快照" 如何在HDFS块被装载到星火工人分区 "

在此图像中,4个HDFS块作为Spark分区加载到3个工作器内存中

HDFS中的数据集分解为分区


示例:我在HDFS系统上放置了一个30GB的文本文件,它将它分发到10个节点上.

威尔·斯帕克

a)使用相同的10个分区?

Spark将相同的10个HDFS块作为分区加载到工作者内存中.我假设30 GB文件的块大小应为3 GB,以获得10个分区/块(默认配置)

b)当我呼叫重新分区(1000)时,在群集中洗牌30GB?

是的,Spark在工作节点之间对数据进行洗牌,以便在工作器内存中创建1000个分区.

注意:

HDFS Block -> Spark partition   : One block can represent as One partition (by default)
Spark partition -> Workers      : Many/One partitions can present in One workers 
Run Code Online (Sandbox Code Playgroud)


Apo*_*ave 10

使用spark-sql读取非分段的HDFS文件(例如镶木地板)时,DataFrame分区的数量df.rdd.getNumPartitions取决于以下因素:

  • spark.default.parallelism (粗略地转换为可用于应用程序的#cores)
  • spark.sql.files.maxPartitionBytes (默认128MB)
  • spark.sql.files.openCostInBytes (默认4MB)

对分区数量的粗略估计是:

  • 如果您有足够的内核来并行读取所有数据(即每128MB数据至少有一个核心)

    AveragePartitionSize ? min(4MB, TotalDataSize/#cores) NumberOfPartitions ? TotalDataSize/AveragePartitionSize

  • 如果你没有足够的核心,

    AveragePartitionSize ? 128MB NumberOfPartitions ? TotalDataSize/AveragePartitionSize

精确的计算有点复杂,可以在FileSourceScanExec的代码库中找到,请参阅此处.


Chi*_*iku 6

添加到@ 0x0FFF如果它从HDFS作为输入文件,它将像这样计算 rdd = SparkContext().textFile("hdfs://.../file.txt"),当你这样做rdd.getNumPatitions时将导致Max(2, Number of HDFS block).我做了很多实验并发现了这个结果.再次明确地说,您可以rdd = SparkContext().textFile("hdfs://.../file.txt", 400)将400作为分区,甚至可以通过重新分区 rdd.repartition或减少到10rdd.coalesce(10)