我有一个包含超过 10 亿行的 DataFrame (df)
df.coalesce(5)
.write
.partitionBy("Country", "Date")
.mode("append")
.parquet(datalake_output_path)
Run Code Online (Sandbox Code Playgroud)
从上面的命令我了解到我的 100 个工作节点集群 (spark 2.4.5) 中只有 5 个工作节点将执行所有任务。使用coalesce(5) 需要7 个小时才能完成该过程。
我应该尝试repartition而不是吗coalesce?
是否有更快速/有效的方法来写出 128 MB 大小的镶木地板文件,或者我是否需要首先计算数据帧的大小以确定需要多少个分区。
例如,如果我的数据帧的大小为 1 GB 并且spark.sql.files.maxPartitionBytes = 128MB,我应该首先计算No. of partitions required as 1 GB/ 128 MB = approx(8)然后执行 repartition(8) 或合并(8) 吗?
这个想法是在撰写本文时最大化输出中的镶木地板文件的大小,并且能够快速(更快)地做到这一点。
数据框由两列(s3ObjectName,batchName)组成,其中包含数万行,例如:-
| s3对象名称 | 批次名称 |
|---|---|
| a1.json | 45 |
| b2.json | 45 |
| c3.json | 45 |
| d4.json | 46 |
| e5.json | 46 |
目标是使用 foreachPartition() 和 foreach() 函数从 S3 存储桶中检索对象并使用数据帧中每行的详细信息并行写入数据湖
// s3 connector details defined as an object so it can be serialized and available on all executors in the cluster
object container {
def getDataSource() = {
val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new …Run Code Online (Sandbox Code Playgroud)