Sid*_*ant 4 apache-spark apache-spark-sql pyspark
我正在尝试使用 Spark 处理集群上的 csv 文件。我想了解是否需要显式读取每个工作节点上的文件以并行处理,或者驱动程序节点是否会读取文件并跨集群分发数据以进行内部处理?(我正在使用 Spark 2.3.2 和 Python)
我知道 RDD 可以使用 SparkContext.parallelize() 进行并行化,但是在 Spark DataFrames 的情况下呢?
if __name__=="__main__":
spark=SparkSession.builder.appName('myApp').getOrCreate()
df=spark.read.csv('dataFile.csv',header=True)
df=df.filter("date>'2010-12-01' AND date<='2010-12-02' AND town=='Madrid'")
Run Code Online (Sandbox Code Playgroud)
因此,如果我在集群上运行上面的代码,整个操作是由驱动程序节点完成的,还是会跨集群分发 df 并且每个工作人员在其数据分区上执行处理?
小智 10
严格地说,如果你运行上面的代码,它不会读取或处理任何数据。DataFrame 基本上是在 RDD 之上实现的抽象。与 RDD 一样,您必须区分转换和操作。由于您的代码仅包含一个filter(...)转换,因此在读取或处理数据方面会发生注意。Spark 只会创建作为执行计划的 DataFrame。您必须执行类似count()或write.csv(...)实际触发 CSV 文件处理的操作。
如果这样做,数据将被 1..n 个工作节点读取和处理。驱动程序节点永远不会读取或处理它。在您的代码中,实际涉及多少或您的工作节点取决于源文件的分区数。源文件的每个分区都可以由一个工作节点并行处理。在您的示例中,它可能是一个 CSV 文件,因此当您df.rdd.getNumPartitions()在阅读文件后调用时,它应该返回1. 因此,只有一个工作节点会读取数据。如果您在filter(...)操作后检查分区数也是如此。
以下是并行处理单个 CSV 文件的两种方法:
你可以通过调用手动重新分区源数据框df.repartition(n)与n你想拥有的分区数目。但是——这是一个重要的但是——这意味着所有数据都可能通过网络发送(又名洗牌)!
您在 DataFrame 上执行聚合或连接。这些操作必须触发 shuffle。然后,Spark 使用spark.sql.shuffle.partitions(默认值:200)中指定的分区数对生成的 DataFrame 进行分区。
| 归档时间: |
|
| 查看次数: |
1867 次 |
| 最近记录: |