为什么 Spark 会为一项操作创建多个作业?

Nas*_*sia 9 python apache-spark pyspark databricks

我注意到,当仅使用一个操作启动这堆代码时,我启动了三个作业。

\n
from typing import List\nfrom pyspark.sql import DataFrame\nfrom pyspark.sql.types import StructType, StructField, StringType\nfrom pyspark.sql.functions import avg\n\ndata: List = [("Diamant_1A", "TopDiamant", "300", "rouge"),\n    ("Diamant_2B", "Diamants pour toujours", "45", "jaune"),\n    ("Diamant_3C", "Mes diamants pr\xc3\xa9f\xc3\xa9r\xc3\xa9s", "78", "rouge"),\n    ("Diamant_4D", "Diamants que j\'aime", "90", "jaune"),\n    ("Diamant_5E", "TopDiamant", "89", "bleu")\n  ]\n\nschema: StructType = StructType([ \\\n    StructField("reference", StringType(), True), \\\n    StructField("marque", StringType(), True), \\\n    StructField("prix", StringType(), True), \\\n    StructField("couleur", StringType(), True)\n  ])\n\ndataframe: DataFrame = spark.createDataFrame(data=data,schema=schema)\n\ndataframe_filtree:DataFrame = dataframe.filter("prix > 50")\n\ndataframe_filtree.show()\n
Run Code Online (Sandbox Code Playgroud)\n

根据我的理解,我应该只得到一个。一项操作对应一项作业。\n我正在使用 Databricks。这可能是问题所在。我有 2 个问题:

\n
    \n
  • 为什么我有 3 份工作而不是 1 份?
  • \n
  • 我可以改变这种行为吗?
  • \n
\n

这是第一份工作:\n第一个达格

\n

这是第二个:\n达格的第二份工作

\n

最后一个:\n达格第三份工作

\n

fig*_*uts 10

1 个操作对应 1 个工作为 True。这里需要注意的是,这在 RDD API 中是正确的

Dataframe 和 Dataset API 是 RDD API 之上的一个抽象层,可以让您的生活更轻松。有时,在调用某个操作时,它会在内部触发多个操作,并且您会看到多个作业

一个例子是用header=True. 当您调用下游操作时,它将触发另一个内部操作,该内部操作读取 csv 的第一行以推断标题,您将看到它显示为作业

另一个原因是自适应查询执行。spark.sql.adaptive.enabled使用阶段统计数据设置True引导火花,并根据这些统计数据决定后续的物理计划。这在最终用户不必担心 Spark 连接中的倾斜等情况下非常有用。然而,这会导致火花将工作分解为许多工作。您将这些视为作业 DAG 中先前作业的跳过阶段。如果你设置spark.sql.adaptive.enabledFalse,你会看到所有这些工作都消失了。但是,您几乎总是希望使用自适应查询执行


try*_*ata 1

@Nastasia 虽然我找不到上述问题的答案,但我想给出我在 8 核 CPU 系统中的一些发现:

  1. 上面的程序没有任何改变:

    No. of Jobs - 3  , 
    No. Tasks  in each Job - 3 , 4 , 1 (because default no. of partitions is 8 )
    
    Run Code Online (Sandbox Code Playgroud)
  2. 在您的示例中添加几行,如下所示,这可以提供一些理解

     print(dataframe.rdd.getNumPartitions())
     dataframe2 = dataframe.coalesce(1)
     print(dataframe2.rdd.getNumPartitions())
     dataframe_filtree:DataFrame = dataframe2.filter("prix > 50")
     dataframe_filtree.show()
    
    Run Code Online (Sandbox Code Playgroud)
    No. of Jobs - 1 ,   
    No. Tasks  in each Job - 1 (because  no. of partitions is 1 )
    
    Run Code Online (Sandbox Code Playgroud)
  3. 更改合并中的参数给出了以下结果:

    coalesce(2): 
    No. of Jobs - 2 (why not 1 ?) , 
    No. Tasks  in each Job - 1,1 
    
    coalesce(6):
    No. of Jobs - 3 (why not 1 or 2) , 
    No. Tasks  in each Job - 1,4,1
    
    Run Code Online (Sandbox Code Playgroud)

显然“分区数量”是这里的一个因素。但还有其他一些因素决定了不。的工作机会。