Avi*_*rya 29 bigdata apache-spark rdd spark-dataframe apache-spark-2.0
我正在尝试利用spark分区.我试图做类似的事情
data.write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.
为了避免我试过
data.coalese(numPart).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.
写入后我应该如何使用分区来避免许多文件?
Rap*_*oth 43
首先我真的会避免使用coalesce,因为这通常会在转换链中被进一步推升,并可能破坏你工作的并行性(我在这里问到了这个问题:如何防止Spark优化)
每个镶木地板分区写1个文件非常容易(参见Spark数据帧写入方法编写许多小文件):
data.repartition($"key").write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
如果要设置任意数量的文件(或具有相同大小的文件),则需要使用可能使用的其他属性进一步重新分区数据(我无法告诉您在您的情况下可能是什么):
data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
another_key可以是数据集的另一个属性,也可以是对现有属性使用某些模运算或舍入运算的派生属性.你甚至可以使用窗口功能与row_number在key通过像,然后这个圆
data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这会将你的N记录放入1个镶木地板文件中
使用orderBy
您还可以通过相应地对数据框进行排序来控制文件数量而无需重新分区:
data.orderBy($"key").write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这将导致spark.sql.shuffle.partitions所有分区的总数(默认为200).之后添加第二个排序列甚至是有益的$key,因为镶木地板将记住数据帧的顺序并将相应地写入统计数据.例如,您可以按ID订购:
data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这不会改变文件的数量,但是当您查询给定的key和/的镶木地板文件时,它会提高性能id.参见例如https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide 和https://db-blog.web.cern.ch/blog/luca-canali/2017-06-潜水-火花和拼花工作负载-例如
Spark 2.2+
从Spark 2.2开始,您还可以使用新选项maxRecordsPerFile来限制每个文件的记录数.如果您有N个分区,您仍将获得至少N个文件,但您可以将1个分区(任务)写入的文件拆分为更小的块:
df.write
.option("maxRecordsPerFile", 10000)
...
Run Code Online (Sandbox Code Playgroud)
参见例如http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/和spark写入磁盘,N个文件少于N个分区
Pow*_*ers 12
让我们用另一种方法扩展 Raphael Roth 的答案,该方法将创建每个分区可以包含的文件数的上限,如本答案所述:
import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")
Run Code Online (Sandbox Code Playgroud)
这里的其他答案都很好,但有一些问题:
将maxRecordsPerFile大分区分解为较小的文件非常方便,但有两个注意事项:
如果您的分区列严重倾斜,则通过它们重新分区意味着可能会将最大数据分区的所有数据移动到单个 DataFrame 分区中。如果该 DataFrame 分区变得太大,仅此一项就可能会导致您的工作崩溃。
举一个简单的例子,想象一下repartition("country")对于世界上每个人都有 1 行的 DataFrame 会做什么。
maxRecordsPerFile将确保您的输出文件不超过一定的行数,但只有一个任务能够连续写出这些文件。一项任务必须处理整个数据分区,而不是能够通过多个任务写出大型数据分区。
repartition(numPartitions, $"some_col", rand)是一个优雅的解决方案,但不能很好地处理小数据分区。它会为每个数据分区写出numPartitions文件,即使它们很小。
在许多情况下,这可能不是问题,但如果您有一个大型数据湖,您就会知道,随着时间的推移,写出许多小文件会降低数据湖的性能。
因此,一种解决方案不适用于非常大的数据分区,而另一种解决方案则不适用于非常小的数据分区。
我们需要的是一种根据数据分区的大小动态缩放输出文件数量的方法。如果它很大,我们需要很多文件。如果它很小,我们只需要几个文件,甚至只需要一个文件。
解决方案是使用该数据分区的所需输出文件数量来扩展该方法repartition(..., rand)并动态缩放其范围。rand
# In this example, `id` is a column in `skewed_data`.
partition_by_columns = ['id']
desired_rows_per_output_file = 10
partition_count = skewed_data.groupBy(partition_by_columns).count()
partition_balanced_data = (
skewed_data
.join(partition_count, on=partition_by_columns)
.withColumn(
'repartition_seed',
(
rand() * partition_count['count'] / desired_rows_per_output_file
).cast('int')
)
.repartition(*partition_by_columns, 'repartition_seed')
)
Run Code Online (Sandbox Code Playgroud)
这将平衡输出文件的大小,无论分区倾斜如何,并且不会限制并行性或为小分区生成太多小文件。
如果您想自己运行此代码,我提供了一个独立的示例,以及 DataFrame 分区正确平衡的证明。
这对我来说非常有效:
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
它在每个输出分区(目录)中生成 N 个文件,并且(据说)比使用更快,coalesce 并且(再次,在我的数据集上)比仅在输出上重新分区更快。
如果您使用 S3,我还建议在本地驱动器上执行所有操作(Spark 在写出期间执行大量文件创建/重命名/删除操作),一旦全部解决,请使用 hadoop(或只是FileUtilaws cli)将所有内容复制过来:
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
// ...
def copy(
in : String,
out : String,
sparkSession: SparkSession
) = {
FileUtil.copy(
FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
new Path(in),
FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
new Path(out),
false,
sparkSession.sparkContext.hadoopConfiguration
)
}
Run Code Online (Sandbox Code Playgroud)
编辑:根据评论中的讨论:
您有一个分区列为 YEAR 的数据集,但每个给定的 YEAR 中的数据量差异很大。因此,一年可能有 1GB 的数据,但另一年可能有 100GB。
这是处理此问题的一种方法的伪代码:
val partitionSize = 10000 // Number of rows you want per output file.
val yearValues = df.select("YEAR").distinct
distinctGroupByValues.each((yearVal) -> {
val subDf = df.filter(s"YEAR = $yearVal")
val numPartitionsToUse = subDf.count / partitionSize
subDf.repartition(numPartitionsToUse).write(outputPath + "/year=$yearVal")
})
Run Code Online (Sandbox Code Playgroud)
但是,我实际上不知道这会起到什么作用。Spark 在读取每个列分区的可变数量的文件时可能会出现问题。
另一种方法是编写您自己的自定义分区器,但我不知道其中涉及什么,所以我无法提供任何代码。
| 归档时间: |
|
| 查看次数: |
26208 次 |
| 最近记录: |