如何在PySpark中广播RDD?

4 python-3.x apache-spark pyspark

是否可以在Python中广播RDD?

我正在关注《Advanced Analytics with Spark: Patterns for Learning from Data at Scale》一书,在第 3 章需要广播 RDD。我尝试使用 Python 而不是 Scala 来遵循示例。

无论如何,即使是这个简单的例子我也有一个错误:

my_list = ["a", "d", "c", "b"]
my_list_rdd = sc.parallelize(my_list)
sc.broadcast(my_list_rdd)
Run Code Online (Sandbox Code Playgroud)

错误是:

"It appears that you are attempting to broadcast an RDD or reference an RDD from an "
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an
action or transformation. RDD transformations and actions can only be invoked by the driver, n
ot inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) i
s invalid because the values transformation and count action cannot be performed inside of the
 rdd1.map transformation. For more information, see SPARK-5063.
Run Code Online (Sandbox Code Playgroud)

我真的不明白该错误指的是什么“行动或转变”。

我在用spark-2.1.1-hadoop2.7

重要编辑:这本书是正确的。我只是没有读到正在广播的不是 RDD,而是使用collectAsMap() 获得的地图版本。

谢谢!

Jac*_*ski 5

是否可以在Python中广播RDD?

TL;DR

当你思考 RDD到底是什么时,你会发现它根本不可能。RDD 中没有任何内容可以广播。它太脆弱了(可以这么说)。

RDD 是一种描述某些数据集上的分布式计算的数据结构。通过 RDD 的特性,您可以描述计算什么以及如何计算。它是一个抽象的实体。

引用RDD的scaladoc :

表示可以并行操作的不可变的、分区的元素集合

在内部,每个 RDD 都有五个主要属性:

  • 分区列表

  • 用于计算每个分割的函数

  • 对其他 RDD 的依赖列表

  • 可选的,用于键值 RDD 的分区器(例如,RDD 是哈希分区的)

  • (可选)计算每个分割的首选位置列表(例如 HDFS 文件的块位置)

您可以广播的内容不多(引用SparkContext.broadcast方法的 scaladoc):

Broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T]将只读变量广播到集群,返回一个org.apache.spark.broadcast.Broadcast对象以在分布式函数中读取它。该变量将仅发送到每个集群一次。

您只能广播真实值,但 RDD 只是一个值容器,仅在执行器处理其数据时才可用。

来自广播变量

广播变量允许程序员在每台机器上缓存只读变量,而不是随任务传送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。

后来在同一个文档中:

这意味着只有当跨多个阶段的任务需要相同的数据或者以反序列化形式缓存数据很重要时,显式创建广播变量才有用。

但是,您可以按如下方式保存collectRDD保存的数据集并广播它:

my_list = ["a", "d", "c", "b"]
my_list_rdd = sc.parallelize(my_list)
sc.broadcast(my_list_rdd.collect) // <-- collect the dataset
Run Code Online (Sandbox Code Playgroud)

在“收集数据集”步骤中,数据集离开 RDD 空间并成为本地可用的集合(Python 值),然后可以进行广播。