Pau*_*ers 12 scala apache-spark apache-spark-sql
当我将数据帧写入csv时,会为每个分区创建一个.csv文件.假设我想将每个文件的最大大小限制为1 MB.我可以多次写入并且每次都增加参数以重新分区.有没有办法可以提前计算用于重新分区的参数,以确保每个文件的最大大小小于某个指定的大小.
我想可能存在病理情况,其中所有数据最终都在一个分区上.因此,做出较弱的假设,我们只想确保平均文件大小小于某个指定的数量,比如1 MB.
Ata*_*ais 13
我试图找出一些不会同时杀死集群的聪明主意,而我脑海中唯一想到的是:
代码看起来应该更像这样:
val df: DataFrame = ??? // your df
val rowSize = getBytes(df.head)
val rowCount = df.count()
val partitionSize = 1000000 // million bytes in MB?
val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt
df.repartition(noPartitions).write.format(...) // save to csv
// just helper function from https://stackoverflow.com/a/39371571/1549135
def getBytes(value: Any): Long = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close
stream.toByteArray.length
}
Run Code Online (Sandbox Code Playgroud)
虽然我的第一选择是计算每一行的字节大小,但这将是非常低效的.因此,除非每行中的数据大小差异很大,否则我会说这个解决方案会起作用.您还可以计算每第n行的大小.你明白了.
此外,我只是'希望' Long足够大,以支持预期的计算大小noPartitions.如果没有(如果你有很多行),也许更改操作顺序会更好,fe:
val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt
Run Code Online (Sandbox Code Playgroud)
再次,这只是一个草拟的想法,没有关于您的数据的领域知识.
在浏览apache-spark文档时,我发现了一个有趣的跨系统解决方案:
spark.sql.files.maxPartitionBytes
哪些设置:
读取文件时打包到单个分区的最大字节数.
默认值为134217728 (128 MB).
所以我想你可以设置它1000000 (1MB),它将对你的产生永久性影响DataFrames.但是,太小的分区大小可能会极大地影响您的性能!
您可以在SparkSession创建期间进行设置:
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.sql.files.maxPartitionBytes", 100000)
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
以上所有内容仅在(我记得正确并且)csv使用与DataFrame的分区相同数量的文件进行分区时才有效.
| 归档时间: |
|
| 查看次数: |
1898 次 |
| 最近记录: |