RDD的切片和分区之间有什么区别?

use*_*023 12 python apache-spark

我正在使用Spark的Python API并运行Spark 0.8.

我正在存储一个大的RDD浮点向量,我需要对整个集合执行一个向量的计算.

RDD中的切片和分区之间有什么区别吗?

当我创建RDD时,我将其作为参数传递100,这使得它将RDD存储为100个切片并在执行计算时创建100个任务.我想知道,通过使系统更有效地处理数据,分区数据是否会提高切片之外的性能(即,在分区上执行操作与在切片RDD中的每个元素上操作之间是否存在差异).

例如,这两段代码之间是否有任何显着差异?

rdd = sc.textFile(demo.txt, 100)
Run Code Online (Sandbox Code Playgroud)

VS

rdd = sc.textFile(demo.txt)
rdd.partitionBy(100)
Run Code Online (Sandbox Code Playgroud)

Nic*_*mas 24

我相信slices并且partitions在Apache Spark中也是如此.

但是,您发布的两段代码之间存在微妙但可能存在显着差异.

此代码将尝试demo.txt使用100个并发任务直接加载到100个分区:

rdd = sc.textFile('demo.txt', 100)
Run Code Online (Sandbox Code Playgroud)

对于未压缩的文本,它将按预期工作.但是如果不是demo.txt你有一个demo.gz,你将得到一个只有1个分区的RDD.对gzip压缩文件的读取无法并行化.

另一方面,以下代码将首先打开demo.txt具有默认分区数的RDD,然后它将显式地将数据重新分区为大小大致相等的100个分区.

rdd = sc.textFile('demo.txt')
rdd = rdd.repartition(100)
Run Code Online (Sandbox Code Playgroud)

因此,在这种情况下,即使使用了一个,demo.gz您最终也会得到一个包含100个分区的RDD.

作为旁注,我替换了你partitionBy(),repartition()因为我认为你正在寻找.partitionBy()要求RDD是元组的RDD.由于repartition()在Spark 0.8.0中不可用,您应该可以使用coalesce(100, shuffle=True).

Spark可以为RDD的每个分区运行1个并发任务,最多可以为集群中的核心数量.因此,如果您有一个包含50个内核的群集,您希望您的RDD至少有50个分区(可能是分区的2-3倍).

从Spark 1.1.0开始,您可以检查RDD具有的分区数,如下所示:

rdd.getNumPartitions()  # Python API
rdd.partitions.size     // Scala API
Run Code Online (Sandbox Code Playgroud)

在1.1.0之前,使用Python API执行此操作的方法是rdd._jrdd.splits().size().