当最简单的 Spark 应用程序似乎完成了相同的工作两次时,我陷入了一种奇怪的情况。
应用程序本身执行查询:
SELECT date, field1, field2, ..., field10
FROM table1
WHERE field1 = <some number>
AND date BETWEEN date('2018-05-01') AND date('2018-05-30')
ORDER BY 1
Run Code Online (Sandbox Code Playgroud)
并将结果存储到HDFS中。
表table1是存储在 HDFS 上的一堆 parquet 文件,分区如下
/root/date=2018-05-01/hour=0/data-1.snappy.parquet
/root/date=2018-05-01/hour=0/data-2.snappy.parquet
...
/root/date=2018-05-01/hour=1/data-1.snappy.parquet
...
/root/date=2018-05-02/hour=0/data-1.snappy.parquet
...
etc.
Run Code Online (Sandbox Code Playgroud)
所有 parquet 文件的大小从 700M 到 2G 不等,并且具有相同的架构:10 个非空字段int或bigint类型的非空字段。
应用程序的结果很小——只有几千行。
我的 Spark 应用程序在 YARN 上以集群模式运行。基本火花参数为
spark.driver.memory=2g
spark.executor.memory=4g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.submit.deployMode=cluster
Run Code Online (Sandbox Code Playgroud)
在执行过程中,几个容器被抢占,没有发生错误和故障。整个应用程序一次完成。
Spark UI 截图:
可以看出,第 2 阶段和第 4 阶段都处理了相同数量的输入行,但第 4 阶段也进行了一些改组(这些是结果行)。失败的任务是容器被抢占的任务。
所以看起来我的应用程序处理了相同的文件两次。
我不知道这怎么可能以及发生了什么。请帮助我理解为什么 Spark 会做如此奇怪的事情。
实际物理计划:
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand hdfs://hadoop/root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB, false, CSV, Map(path -> /root/tmp/1530123240802-PrQXaOjPoDqCBhfadgrXBiTtfvFrQRlB), Overwrite, [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
+- Coalesce 16
+- *(2) Sort [date#10 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(date#10 ASC NULLS FIRST, 200)
+- *(1) Project [date#10, field1#1L, field0#0L, field3#3L, field2#2L, field5#5, field4#4, field6#6L, field7#7]
+- *(1) Filter (isnotnull(field1#1L) && (field1#1L = 1234567890))
+- *(1) FileScan parquet default.table1[field0#0L,field1#1L,field2#2L,field3#3L,field4#4,field5#5,field6#6L,field7#7,date#10,hour#11] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hadoop/table1], PartitionCount: 714, PartitionFilters: [(date#10 >= 17652), (date#10 <= 17682)], PushedFilters: [IsNotNull(field1), EqualTo(field1,1234567890)], ReadSchema: struct<field0:bigint,field1:bigint,field2:bigint,field3:bigint,field4:int,field5:int,field6:bigint,field7:...
Run Code Online (Sandbox Code Playgroud)
以下是第 2 阶段和第 4 阶段的 DAG:
小智 4
我遇到了同样的问题,事实证明这种行为是完全正常的。
我在一个 Spark 作业中观察到了这种行为,该作业只是从 HDFS 读取,进行一些轻量级处理,并orderBy在写回 HDFS 之前使用该方法对列进行排序。在 Spark UI 中,我看到两个作业将扫描整个 6 TB 表,就像您所做的那样。第一个作业使用的内存非常少,没有写入任何 shuffle 记录,也没有向 HDFS 写入任何记录。
事实证明,根本原因是在实际对数据进行排序之前,Spark 执行了一个采样操作,帮助它定义一个RangePartitioner用于对数据进行分区的排序算法:它需要知道该列中数据的大致范围。定义好排序键RangePartitioner。
这篇博客提到了这个操作:
https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/
这个 StackOverflow 帖子:
以及 Holden Karau 和 Rachel Warran 所著的伟大著作《High Performance Spark》,第 14 页。143.
就我而言,我知道键的范围,所以我想到原则上我应该能够定义RangePartitioner 先验。然而,我在 Spark 源代码中挖掘了它的sort方法,但没有找到任何可以显式传递范围的解决方法。
| 归档时间: |
|
| 查看次数: |
3271 次 |
| 最近记录: |