pyspark在将rdd转换为dataframe时使用mapPartitions的一个任务

Dav*_*vid 14 python apache-spark apache-spark-sql pyspark

我很困惑为什么Spark rdd.mapPartitions在将生成的RDD转换为DataFrame时使用1个任务.

这对我来说是一个问题,因为我想从:

DataFrame- > RDD- > rdd.mapPartitions- >DataFrame

这样我就可以读取数据(DataFrame),将非SQL函数应用于数据块(RDD上的mapPartitions),然后转换回DataFrame,以便我可以使用该DataFrame.write过程.

我可以从DataFrame - > mapPartitions转到然后使用像saveAsTextFile这样的RDD编写器,但这不太理想,因为该DataFrame.write过程可以执行诸如以Orc格式覆盖和保存数据之类的操作.所以我想了解为什么会这样,但从实际的角度来看,我主要关心的是能够从DataFrame - > mapParitions - >到使用DataFrame.write进程.

这是一个可重复的例子.以下按预期工作,包含100项工作任务mapPartitions:

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession \
    .builder \
    .master("yarn-client") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)

def f(part):
    return [(1,2)]

spark_df.rdd.mapPartitions(f).collect()
Run Code Online (Sandbox Code Playgroud)

但是,如果最后一行改变为类似的东西,spark_df.rdd.mapPartitions(f).toDF().show()那么mapPartitions工作只会有一个任务.

一些截图说明如下: 在此输入图像描述 在此输入图像描述

sgv*_*gvd 6

DataFrame.show() 仅显示数据帧的第一行数,默认情况下仅显示前20行.如果该数字小于每个分区的行数,则Spark是惰性的,仅评估单个分区,这相当于单个任务.

您还可以collect对数据帧执行操作,计算和收集所有分区,并再次查看100个任务.

您仍然会runJob像以前一样首先看到任务,这是由toDF能够确定结果数据帧架构的调用引起的:它需要处理单个分区才能确定映射函数的输出类型.在这个初始阶段之后,实际的行动collect将发生在所有分区上.例如,对于我运行您的代码段,最后一行替换为spark_df.rdd.mapPartitions(f).toDF().collect()以下阶段的结果:

在此输入图像描述