Spark 作业折叠成一个分区,但我不明白为什么

Vit*_*liy 2 apache-spark databricks

我正在尝试调整一个火花工作。

我正在使用数据块来运行它,在某些时候我看到了这张图片:

在此处输入图片说明

请注意,在第 12 阶段,我只有一个分区——这意味着没有并行性。我怎样才能推断出这种情况的原因?可以肯定的是,我的代码中没有任何“重新分区(1)”。

添加(稍微混淆的)代码:

spark.read(cid, location).createOrReplaceTempView("some_parquets")

parquets = spark.profile_paqrquet_df(cid)

parquets.where("year = 2018 and month = 5 and day = 18 and sm_device_source = 'js'"
        .createOrReplaceTempView("parquets")

# join between two dataframes.  

spark.sql(
    """
        SELECT     {fields}
        FROM       some_parquets  
        WHERE      some_parquets.a = 'js' 
        AND        some_parquets.b = 'normal' 
        AND        date_f >= to_date('2018-05-01') 
        AND date_f < to_date('2018-05-05') 
        limit {limit}
    """.format(limit=1000000, fields=",".join(fields))
).createOrReplaceTempView("some_parquets")

join_result = spark.sql(
    """
        SELECT 
                   parquets.some_field, 
                   struct(some_parquets.*) as some_parquets
        FROM       some_parquets  
        LEFT ANTI JOIN some_ids ON some_parquets.sid = some_ids.sid 
        LEFT OUTER JOIN parquets ON some_parquets.uid = parquets.uid   
    """.format(some_ids=some_ids)
)

# turn items in each partition into vectors for machine learning
vectors = join_result \
    .rdd \
    .mapPartitions(extract)

# write vectors to file system. This evaluates the results
dump_vectors(vectors, output_folder) 
Run Code Online (Sandbox Code Playgroud)

会话构建:

spark = SparkSession \
        .builder \
        .appName("...") \
        .config("spark.sql.shuffle.partitions", 1000)
Run Code Online (Sandbox Code Playgroud)

小智 7

如果有人仍然对答案感兴趣,简而言之,它是由于限制条款而发生的。奇怪的是,限制子句在 shuffle 阶段后将数据​​折叠到单个分区中。

只是在我的本地 spark-shell 上运行的示例

scala> spark.sql("Select * from temp limit 1").rdd.partitions.size
res28: Int = 1

scala> spark.sql("Select * from temp").rdd.partitions.size
res29: Int = 16
Run Code Online (Sandbox Code Playgroud)