jk1*_*jk1 7 scala partitioning apache-spark
所以问题就在主题中。我想我没有正确理解重新分区的工作。在我看来,当我说somedataset.repartition(600)我希望所有数据都将在工作人员之间按相同大小进行划分(假设有 60 个工作人员)。
例如。我会将大量数据加载到不平衡的文件中,比如说 400 个文件,其中 20% 的大小为 2Gb,其他 80% 的大小约为 1Mb。我有加载此数据的代码:
val source = sparkSession.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("delimiter","\t")
.load(mypath)
Run Code Online (Sandbox Code Playgroud)
我想将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(具有附加属性),然后按某些列分区并写入镶木地板。在我看来,在工作人员之间平衡数据(40000 个分区)似乎是合理的,而不是像这样进行工作:
val ds: Dataset[FinalObject] = source.repartition(600)
.map(parse)
.filter(filter.IsValid(_))
.map(convert)
.persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")
ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns:_*)
.format("parquet")
.mode(SaveMode.Append)
.save(destUrl)
Run Code Online (Sandbox Code Playgroud)
但它失败了
ExecutorLostFailure(执行程序 7 因正在运行的任务之一而退出) 原因:容器因超出内存限制而被 YARN 终止。已使用 34.6 GB 物理内存或 34.3 GB 物理内存。考虑提高spark.yarn.executor.memoryOverhead。
当我不进行重新分区时,一切都很好。我哪里不明白重新分区正确吗?
您的逻辑是正确的repartition,但partitionBy在使用之前repartition您需要从多个来源记住这一点。
请记住,重新分区数据是一项相当昂贵的操作。Spark还有一个名为coalesce()的repartition()优化版本,它可以避免数据移动,但前提是您要减少RDD分区的数量。
如果您希望您的任务必须完成,请增加驱动程序和执行程序内存