PySpark:使用binaryFiles()函数读取二进制文件时进行分区

use*_*_19 2 partitioning binaryfiles apache-spark rdd pyspark

sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).partitionBy(8)
Run Code Online (Sandbox Code Playgroud)

要么

sc = SparkContext("Local")
rdd = sc.binaryFiles(Path to the binary file , minPartitions = 5).repartition(8)
Run Code Online (Sandbox Code Playgroud)

使用上述任一代码,我试图在我的RDD中创建8个分区{其中,我希望数据在所有分区上均匀分布}.当我打印{rdd.getNumPartitions()}所示的分区的数目是8只,但在火花UI,我观察到,虽然8个分区由但所有的整个二进制文件数据被放置在仅一个分区.

注意:minPartition属性不起作用.即使在设置minPartitions = 5之后,RDD中创建的分区数也只有1.因此,使用了partitionBy/repartition函数.

这是期望的行为还是我错过了什么?

Mar*_*cok 8

Spark 2.4+,问题应该修复,请参阅@ Rahul在这个答案下方的评论.

Spark 2.1-2.3,minPartitions参数binaryFiles()被忽略.请参阅Spark-16575以及对函数setMinPartitions()提交更改.注意在提交更改中如何minPartitions不再使用该函数!

如果您正在阅读多个二进制文件binaryFiles(),则输入文件将根据以下内容合并到分区中:

  • spark.files.maxPartitionBytes,默认128 MB
  • spark.files.openCostInBytes,默认4 MB
  • spark.default.parallelism
  • 输入的总大小

这里描述前三个配置项.请参阅上面的提交更改以查看实际计算.

我有一个场景,我希望每个输入分区最多40 MB,因此每个任务40 MB ...在解析时增加并行性.(Spark将128 MB放入每个分区,减慢了我的应用程序.)我spark.files.maxPartitionBytes在调用之前设置为40 M binaryFiles():

spark = SparkSession \
   .builder \
   .config("spark.files.maxPartitionBytes", 40*1024*1024)
Run Code Online (Sandbox Code Playgroud)

对于只有一个输入文件,@ user9864979的答案是正确的:单个文件不能使用just分割成多个分区binaryFiles().


使用Spark 1.6读取多个文件时,minPartitions参数确实有效,您必须使用它.如果不这样做,您将遇到Spark-16575问题:所有输入文件只会被读入两个分区!

您会发现Spark通常会比您请求的输入分区更少.我有一个场景,我希望每两个输入二进制文件有一个输入分区.我发现设置minPartitions为"输入文件的数量*7/10"给了我大概我想要的东西.
我有另一个场景,我想为每个输入文件一个输入分区.我发现设置minPartitions为"输入文件数量*2"给了我想要的东西.

Spark 1.5的行为binaryFiles():每个输入文件都有一个分区.

  • 好答案。看来这已在spark 2.4(https://issues.apache.org/jira/browse/SPARK-22357)中修复。同时,将spark.files.maxPartitionBytes设置为较低值似乎是哄骗spark使用多个分区的解决方法。 (2认同)