pyspark加入缓慢,尝试重新分区

Ale*_*dro 5 apache-spark apache-spark-sql pyspark

我正在尝试左连接 Spark 3 上的 2 个表,其中包含 17M 行(事件)和 400M 行(详细信息)。拥有一个包含 1 + 15 x 64 核实例的 EMR 集群。(r6g.16xlarge 尝试使用类似的 r5a)源文件是从 S3 加载的未分区镶木地板。

这是我用来加入的代码:

join = (
    broadcast(events).join(
        details,
        [
            details["a"] == events["a2"],
            (unix_timestamp(events["date"]) - unix_timestamp(details["date"])) / 3600
            > 5,
        ],
        "left",
    )
).drop("a")

join.checkpoint()
Run Code Online (Sandbox Code Playgroud)

为了分区,我使用这个:

executors = 15 * 64 * 3  # 15 instances, 64 cores, 3 workers per core
Run Code Online (Sandbox Code Playgroud)

所以我尝试了:

details = details.repartition(executors, "a")
Run Code Online (Sandbox Code Playgroud)

details = details.withColumn("salt", (rand(seed=42) * nSaltBins).cast("int"))
details = details.repartition(executors, "salt")
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,90%的工作人员在大约 5-10 分钟内结束,其余的继续很长一段时间(50 分钟以上),长绿线,日志上没有内存或磁盘错误。

分区后存在一点偏差(所有分区在 180k 到 160k 行之间),没有任何原因会占用超过 50 分钟的处理器时间。

知道我可以监督什么吗?读了很多帖子,仍然觉得绿线(工人时间)彼此之间应该更近,它们都是同时开始的,他们不等待工人结束。

谢谢!

--编辑--- 删除广播

在作业 11,第 17 阶段,它在 2 分钟内完成了 974/1000,30 分钟后仍然完成了 993/1000,上一步使用了 salted 分区(由 executors 变量给出),并且速度非常快。

执行计划:

Using 17906254 events
== Physical Plan ==
AdaptiveSparkPlan (13)
+- Project (12)
   +- SortMergeJoin LeftOuter (11)
      :- Sort (4)
      :  +- Exchange (3)
      :     +- Project (2)
      :        +- Scan parquet  (1)
      +- Sort (10)
         +- Exchange (9)
            +- Exchange (8)
               +- Project (7)
                  +- Filter (6)
                     +- Scan parquet  (5)
Run Code Online (Sandbox Code Playgroud)

作为图像

2 小时或 25% 以上的时间的示例是剩余 1 个执行者 在此输入图像描述

当前火花配置:

spark = SparkSession.builder.appName('Test').config("spark.driver.memory", "108g").config(
        "spark.executor.instances", "59").config("spark.executor.memoryOverhead", "13312").config(
        "spark.executor.memory", "108g").config("spark.executor.cores", "15").config("spark.driver.cores", "15").config(
        "spark.default.parallelism", "1770").config("spark.sql.adaptive.enabled", "true").config(
        "spark.sql.adaptive.skewJoin.enabled", "true").config("spark.sql.shuffle.partitions", "885").getOrCreate()
Run Code Online (Sandbox Code Playgroud)

rlu*_*uta 7

您的问题看起来像是倾斜连接的一个很好的例子,其中某些分区将比其他分区获得更多的数据,从而减慢整个作业的速度。

在连接之前重新分区数据帧不会有帮助,因为 SortMergeJoin 操作将在连接键上再次重新分区以处理连接

由于您使用的是 Spark 3,因此您应该支持自动 skewJoin 管理

要使用它,请确保您同时拥有spark.sql.adaptive.enabled=true(在标准 Spark 发行版中默认为 false)和spark.sql.adaptive.skewJoin.enabled=true

如果您无法使用自动 skewJoin 优化,您可以使用以下方法手动修复它:

  • 将小数据集重复 N 次
n = 10   # Chose an appropriate amount based on skewness
skewedEvents = events.crossJoin(spark.range(0,n).withColumnRenamed("id","eventSalt"))
Run Code Online (Sandbox Code Playgroud)
  • 使用 0 到 N 之间的随机列值播种大型数据集
import pyspark.sql.functions as f

skewedDetails = details.withColumn("detailSalt", (f.rand() * n).cast("int"))
Run Code Online (Sandbox Code Playgroud)
  • 在连接键中使用您的盐进行连接,然后删除盐
joined = skewedEvents.join(skewedDetails,[         [
            skewedDetails["a"] == skewedEvents["a2"],
            skewedDetails["detailSalt"] == skewedEvents["eventSalt"],
            (unix_timestamp(skewedEvents["date"]) - unix_timestamp(skewedDetails["date"])) / 3600
            > 5,
        ],
        "left")\
        .filter("a is not null or (a is null and eventSalt = 0)")\
        .drop("a").drop("eventSalt").drop("detailSalt")
Run Code Online (Sandbox Code Playgroud)

请注意,还可能需要验证您的查询连接条件,因为 UI 显示在详细信息上处理了 3.33 亿行,在事件上处理了 1700 万行,您生成了超过 50 亿的输出行,因此您可能会匹配更多您认为与连接条件相关的行。