Ale*_*dro 5 apache-spark apache-spark-sql pyspark
我正在尝试左连接 Spark 3 上的 2 个表,其中包含 17M 行(事件)和 400M 行(详细信息)。拥有一个包含 1 + 15 x 64 核实例的 EMR 集群。(r6g.16xlarge 尝试使用类似的 r5a)源文件是从 S3 加载的未分区镶木地板。
这是我用来加入的代码:
join = (
broadcast(events).join(
details,
[
details["a"] == events["a2"],
(unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
> 5,
],
"left",
)
).drop("a")
join.checkpoint()
Run Code Online (Sandbox Code Playgroud)
为了分区,我使用这个:
executors = 15 * 64 * 3 # 15 instances, 64 cores, 3 workers per core
Run Code Online (Sandbox Code Playgroud)
所以我尝试了:
details = details.repartition(executors, "a")
Run Code Online (Sandbox Code Playgroud)
和
details = details.withColumn("salt", (rand(seed=42) * nSaltBins).cast("int"))
details = details.repartition(executors, "salt")
Run Code Online (Sandbox Code Playgroud)
在这两种情况下,90%的工作人员在大约 5-10 分钟内结束,其余的继续很长一段时间(50 分钟以上),长绿线,日志上没有内存或磁盘错误。
分区后存在一点偏差(所有分区在 180k 到 160k 行之间),没有任何原因会占用超过 50 分钟的处理器时间。
知道我可以监督什么吗?读了很多帖子,仍然觉得绿线(工人时间)彼此之间应该更近,它们都是同时开始的,他们不等待工人结束。
谢谢!
--编辑--- 删除广播
在作业 11,第 17 阶段,它在 2 分钟内完成了 974/1000,30 分钟后仍然完成了 993/1000,上一步使用了 salted 分区(由 executors 变量给出),并且速度非常快。
执行计划:
Using 17906254 events
== Physical Plan ==
AdaptiveSparkPlan (13)
+- Project (12)
+- SortMergeJoin LeftOuter (11)
:- Sort (4)
: +- Exchange (3)
: +- Project (2)
: +- Scan parquet (1)
+- Sort (10)
+- Exchange (9)
+- Exchange (8)
+- Project (7)
+- Filter (6)
+- Scan parquet (5)
Run Code Online (Sandbox Code Playgroud)
当前火花配置:
spark = SparkSession.builder.appName('Test').config("spark.driver.memory", "108g").config(
"spark.executor.instances", "59").config("spark.executor.memoryOverhead", "13312").config(
"spark.executor.memory", "108g").config("spark.executor.cores", "15").config("spark.driver.cores", "15").config(
"spark.default.parallelism", "1770").config("spark.sql.adaptive.enabled", "true").config(
"spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "885").getOrCreate()
Run Code Online (Sandbox Code Playgroud)
您的问题看起来像是倾斜连接的一个很好的例子,其中某些分区将比其他分区获得更多的数据,从而减慢整个作业的速度。
在连接之前重新分区数据帧不会有帮助,因为 SortMergeJoin 操作将在连接键上再次重新分区以处理连接
由于您使用的是 Spark 3,因此您应该支持自动 skewJoin 管理。
要使用它,请确保您同时拥有spark.sql.adaptive.enabled=true(在标准 Spark 发行版中默认为 false)和spark.sql.adaptive.skewJoin.enabled=true
如果您无法使用自动 skewJoin 优化,您可以使用以下方法手动修复它:
n = 10 # Chose an appropriate amount based on skewness
skewedEvents = events.crossJoin(spark.range(0,n).withColumnRenamed("id","eventSalt"))
Run Code Online (Sandbox Code Playgroud)
import pyspark.sql.functions as f
skewedDetails = details.withColumn("detailSalt", (f.rand() * n).cast("int"))
Run Code Online (Sandbox Code Playgroud)
joined = skewedEvents.join(skewedDetails,[ [
skewedDetails["a"] == skewedEvents["a2"],
skewedDetails["detailSalt"] == skewedEvents["eventSalt"],
(unix_timestamp(skewedEvents["date"]) - unix_timestamp(skewedDetails["date"])) / 3600
> 5,
],
"left")\
.filter("a is not null or (a is null and eventSalt = 0)")\
.drop("a").drop("eventSalt").drop("detailSalt")
Run Code Online (Sandbox Code Playgroud)
请注意,还可能需要验证您的查询连接条件,因为 UI 显示在详细信息上处理了 3.33 亿行,在事件上处理了 1700 万行,您生成了超过 50 亿的输出行,因此您可能会匹配更多您认为与连接条件相关的行。
| 归档时间: |
|
| 查看次数: |
10795 次 |
| 最近记录: |