SparkReduce函数:了解它是如何工作的

use*_*622 5 reduce apache-spark

我正在学习这门课程

它说 RDD 上的归约操作一次在一台机器上完成。这意味着,如果您的数据分布在两台计算机上,那么以下函数将处理第一台计算机中的数据,找到该数据的结果,然后将从第二台计算机中获取单个值,运行该函数,然后它将继续如此直到完成机器 2 中的所有值。这是正确的吗?

我认为该函数将同时在两台机器上开始运行,然后一旦获得两台机器的结果,它将再次最后一次运行该函数

rdd1=rdd.reduce(lambda x,y: x+y)
Run Code Online (Sandbox Code Playgroud)

更新1--------------------------------------------------------

与reduce函数相比,下面的步骤会给出更快的答案吗?

Rdd=[3,5,4,7,4]
seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
collData.aggregate(0, seqOp, combOp)
Run Code Online (Sandbox Code Playgroud)

更新2------------------------------------------------

下面的两组代码是否应该在相同的时间内执行?我检查了一下,似乎两者花费的时间相同。

import datetime

data=range(1,1000000000)
distData = sc.parallelize(data,4)
print(datetime.datetime.now())
a=distData.reduce(lambda x,y:x+y)
print(a)
print(datetime.datetime.now())

seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
print(datetime.datetime.now())
b=distData.aggregate(0, seqOp, combOp)
print(b)
print(datetime.datetime.now())
Run Code Online (Sandbox Code Playgroud)

zer*_*323 3

reduce本机语言 (Scala) 和来宾语言 (Python) 之间的行为略有不同,但稍微简化一下:

  • 每个分区按顺序逐个元素进行处理
  • 多个分区可以由单个工作线程(多个执行线程)或不同的工作线程同时处理
  • 部分结果被提取到应用最终归约的驱动程序(这是在 PySpark 和 Scala 中具有不同实现的部分)

因为看起来您正在使用 Python,所以让我们看一下代码:

  1. reduce 为用户提供的函数创建一个简单的包装器:

    def func(iterator):
        ...
    
    Run Code Online (Sandbox Code Playgroud)
  2. 这是包装器,用于mapPartitions

    vals = self.mapPartitions(func).collect()
    
    Run Code Online (Sandbox Code Playgroud)

    很明显,这段代码是令人尴尬的并行,并且不关心如何使用结果

  3. vals使用标准 Python 在驱动程序上按顺序减少收集的数据reduce

    reduce(f, vals)
    
    Run Code Online (Sandbox Code Playgroud)

    其中f是传递给的函数RDD.reduce

相比之下,Scala 会异步合并来自工作线程的部分结果。

在步骤 3 的情况下treeReduce,也可以以分布式方式执行。请参阅了解 Spark 中的 treeReduce()

总而言之,排除驱动程序端处理,使用与或等基本转换reduce完全相同的机制 ( ) ,并提供相同级别的并行性(再次排除驱动程序代码)。如果您有大量分区或者成本昂贵,您可以使用一系列方法进行并行/分布式最终合并。mapPartitionsmapfilterftree*