限制数据帧分区的最大大小

Pau*_*ers 12 scala apache-spark apache-spark-sql

当我将数据帧写入csv时,会为每个分区创建一个.csv文件.假设我想将每个文件的最大大小限制为1 MB.我可以多次写入并且每次都增加参数以重新分区.有没有办法可以提前计算用于重新分区的参数,以确保每个文件的最大大小小于某个指定的大小.

我想可能存在病理情况,其中所有数据最终都在一个分区上.因此,做出较弱的假设,我们只想确保平均文件大小小于某个指定的数量,比如1 MB.

Ata*_*ais 13

1.单数据帧解决方案

我试图找出一些不会同时杀死集群的聪明主意,而我脑海中唯一想到的是:

  1. 计算序列化行的大小
  2. 得到不.您的DataFrame中的行数
  3. 重新分配,除以预期的大小
  4. 应该管用?

代码看起来应该更像这样:

val df: DataFrame = ??? // your df
val rowSize = getBytes(df.head)
val rowCount = df.count()
val partitionSize = 1000000 // million bytes in MB?
val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt
df.repartition(noPartitions).write.format(...) // save to csv

// just helper function from https://stackoverflow.com/a/39371571/1549135
def getBytes(value: Any): Long = {
  val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  val oos = new ObjectOutputStream(stream)
  oos.writeObject(value)
  oos.close
  stream.toByteArray.length
}
Run Code Online (Sandbox Code Playgroud)

虽然我的第一选择是计算每一行的字节大小,但这将是非常低效的.因此,除非每行中的数据大小差异很大,否则我会说这个解决方案会起作用.您还可以计算每第n行的大小.你明白了.

此外,我只是'希望' Long足够大,以支持预期的计算大小noPartitions.如果没有(如果你有很多行),也许更改操作顺序会更好,fe:

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt
Run Code Online (Sandbox Code Playgroud)

再次,这只是一个草拟的想法,没有关于您的数据的领域知识.

2.跨系统解决方案

在浏览apache-spark文档时,我发现了一个有趣的跨系统解决方案:

spark.sql.files.maxPartitionBytes 哪些设置:

读取文件时打包到单个分区的最大字节数.

默认值为134217728 (128 MB).

所以我想你可以设置它1000000 (1MB),它将对你的产生永久性影响DataFrames.但是,太小的分区大小可能会极大地影响您的性能!

您可以在SparkSession创建期间进行设置:

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.files.maxPartitionBytes", 100000)
  .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

以上所有内容仅在(我记得正确并且)csv使用与DataFrame的分区相同数量的文件进行分区时才有效.