use*_*360 5 partitioning hadoop-partitioning apache-spark hadoop2 apache-spark-sql
INPUT:
输入数据集包含多个文件中的 1000 万笔交易,以镶木地板形式存储。包括所有文件在内的整个数据集的大小范围为 6 到 8GB。
问题陈述:
根据客户 ID 对交易进行分区,这将为每个客户 ID 创建一个文件夹,每个文件夹包含该特定客户完成的所有交易。
HDFS 对根目录中可以创建的子目录数量有 640 万个硬性限制,因此使用客户 ID 的最后两位数字(范围从 00、01、02...到 99)来创建顶级目录和每个顶级目录将包含所有以该特定两位数字结尾的客户 ID。
示例输出目录结构:
00/cust_id=100900/part1.csv
00/cust_id=100800/part33.csv
01/cust_id=100801/part1.csv
03/cust_id=100803/part1.csv
代码:
// Reading input file and storing in cache
val parquetReader = sparksession.read
.parquet("/inputs")
.persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory
// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
var idEndPattern = customerIdEndingPattern + ""
if (customerIdEndingPattern < 10) {
idEndPattern = "0" + customerIdEndingPattern
}
parquetReader
.filter(col("customer_id").endsWith(idEndPattern))
.repartition(945, col("customer_id"))
.write
.partitionBy("customer_id")
.option("header", "true")
.mode("append")
.csv("/" + idEndPattern)
customerIdEndingPattern = customerIdEndingPattern + 1
}
Run Code Online (Sandbox Code Playgroud)
Spark 配置: Amazon EMR 5.29.0(Spark 2.4.4 和 Hadoop 2.8.5)
1 个主节点和 10 个从节点,每个节点都有 96 个 vCore 和 768GB RAM(Amazon AWS R5.24xlarge 实例)。硬盘是 EBS,具有 3000 IOPS 的半身像,持续 30 分钟。
'spark.hadoop.dfs.replication': '3',
'spark.driver.cores':'5',
'spark.driver.memory':'32g',
'spark.executor.instances': '189',
'spark.executor.memory': '32g',
'spark.executor.cores': '5',
'spark.executor.memoryOverhead':'8192',
'spark.driver.memoryOverhead':'8192',
'spark.default.parallelism':'945',
'spark.sql.shuffle.partitions' :'945',
'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
'spark.dynamicAllocation.enabled': 'false',
'spark.memory.fraction':'0.8',
'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
'spark.memory.storageFraction':'0.2',
'spark.task.maxFailures': '6',
'spark.driver.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
Run Code Online (Sandbox Code Playgroud)
缩放问题:
从 10 到一直到 40 个从站进行实验(相应地调整 spark 配置),但结果仍然相同,作业需要 2 小时以上才能完成(如第一张图片所示,每个作业需要超过一分钟,而 while 循环运行 99次)。此外,来自远程执行程序的读取几乎不存在(这很好),大多数是本地进程。
分区似乎工作正常(请参阅第二张图片)每个实例有 5 个 RDD 块,并且始终运行 5 个任务(每个实例有 5 个内核,每个从节点有 19 个实例)。GC 也进行了优化。
在 while 循环中写入的每个 partitionby 任务都需要一分钟或更长时间才能完成。
指标:
1 个作业的摘要基本上是一个分区通过执行
完整作业完成后的一些实例摘要,因此 RDD 块为零,第一行是驱动程序。
所以问题是如何进一步优化它以及为什么它没有扩大规模?有没有更好的方法来解决它?我已经达到最高性能了吗?假设我可以访问更多的硬件资源,还有什么我可以做得更好的吗?欢迎任何建议。
触摸每条记录 100 次是非常低效的,即使数据可以缓存在内存中并且不会被下游逐出。更何况一个人坚持是昂贵的
相反,您可以添加一个虚拟列
import org.apache.spark.sql.functions.substring
val df = sparksession.read
.parquet("/inputs")
.withColumn("partition_id", substring($"customer_id", -2, 2))
Run Code Online (Sandbox Code Playgroud)
并稍后用于分区
df
.write
.partitionBy("partition_id", "customer_id")
.option("header", "true")
.mode("append")
.csv("/")
Run Code Online (Sandbox Code Playgroud)
为了避免许多小文件,您可以先使用更长的后缀重新分区
val nParts: Int = ???
val suffixLength: Int = ??? // >= suffix length used for write partitions
df
.repartitionByRange(
nParts,
substring($"customer_id", -suffixLength, suffixLength)
.write
.partitionBy("partition_id", "customer_id")
.option("header", "true")
.mode("append")
.csv("/")
Run Code Online (Sandbox Code Playgroud)
此类更改将允许您一次性处理所有数据,而无需任何显式缓存。
| 归档时间: |
|
| 查看次数: |
297 次 |
| 最近记录: |