Vit*_*liy 2 apache-spark databricks
我正在尝试调整一个火花工作。
我正在使用数据块来运行它,在某些时候我看到了这张图片:
请注意,在第 12 阶段,我只有一个分区——这意味着没有并行性。我怎样才能推断出这种情况的原因?可以肯定的是,我的代码中没有任何“重新分区(1)”。
添加(稍微混淆的)代码:
spark.read(cid, location).createOrReplaceTempView("some_parquets")
parquets = spark.profile_paqrquet_df(cid)
parquets.where("year = 2018 and month = 5 and day = 18 and sm_device_source = 'js'"
.createOrReplaceTempView("parquets")
# join between two dataframes.
spark.sql(
"""
SELECT {fields}
FROM some_parquets
WHERE some_parquets.a = 'js'
AND some_parquets.b = 'normal'
AND date_f >= to_date('2018-05-01')
AND date_f < to_date('2018-05-05')
limit {limit}
""".format(limit=1000000, fields=",".join(fields))
).createOrReplaceTempView("some_parquets")
join_result = spark.sql(
"""
SELECT
parquets.some_field,
struct(some_parquets.*) as some_parquets
FROM some_parquets
LEFT ANTI JOIN some_ids ON some_parquets.sid = some_ids.sid
LEFT OUTER JOIN parquets ON some_parquets.uid = parquets.uid
""".format(some_ids=some_ids)
)
# turn items in each partition into vectors for machine learning
vectors = join_result \
.rdd \
.mapPartitions(extract)
# write vectors to file system. This evaluates the results
dump_vectors(vectors, output_folder)
Run Code Online (Sandbox Code Playgroud)
会话构建:
spark = SparkSession \
.builder \
.appName("...") \
.config("spark.sql.shuffle.partitions", 1000)
Run Code Online (Sandbox Code Playgroud)
小智 7
如果有人仍然对答案感兴趣,简而言之,它是由于限制条款而发生的。奇怪的是,限制子句在 shuffle 阶段后将数据折叠到单个分区中。
只是在我的本地 spark-shell 上运行的示例
scala> spark.sql("Select * from temp limit 1").rdd.partitions.size
res28: Int = 1
scala> spark.sql("Select * from temp").rdd.partitions.size
res29: Int = 16
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
591 次 |
| 最近记录: |