Ame*_*url 4 python apache-spark rdd pyspark
我正在编写一个pyspark脚本来读取一个大的二维数组,所以我尝试先生成一个索引数组并使用read方法映射以读入相应的数组.例如,如果我有一个包含10行的数组,那么我希望这10行被均匀分区,因为每个分区有2行.我用sortBy()尝试过这种方式:
rdd = sc.range(0, 10, 1).sortBy(lambda x: x, numPartitions = 5)
rdd.glom().collect()
Run Code Online (Sandbox Code Playgroud)
但结果显示为:
[[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]
Run Code Online (Sandbox Code Playgroud)
这表示sortBy()没有按预期工作,因此第一个分区有3个数字,而最后一个分区只有1个数字.当我用另一个读取方法映射每个分区时,分区的大小是不同的,有时会产生落后者.
我尝试了另一种RDD生成方式:
rdd = sc.parallelize(range(0, 10, 1), 5)
rdd.glom().collect()
Run Code Online (Sandbox Code Playgroud)
它返回我想要的结果.
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
Run Code Online (Sandbox Code Playgroud)
有人可以帮助解释为什么使用sortBy()的第一个方法不能返回均匀排序的结果?
因为它不是为了设计的.通常情况下,不可能对数据进行分区(包括范围分区)以实现相同大小的分区.请记住,通过分区程序的合同,特定值的所有记录都必须驻留在单个分区上.即使在可以实现均匀分布的情况下,确定精确的分区边界也会非常昂贵.
因为Spark的样本数据的目标是获得大致相同大小的范围,这种行为对于典型的Spark应用程序来说已经足够了.
SparkContext.parallelize
根本不使用分区器.相反,它根据特定输入的语义计算拆分,因此可以创建相同大小的拆分.
如果您事先了解数据分布,则可以始终设计自定义分区功能,从而获得所需的输出.例如:
import bisect
from functools import partial
partition_func = partial(bisect.bisect, [2, 4, 6, 8])
(sc.range(0, 10)
.map(lambda x: (x, None))
.repartitionAndSortWithinPartitions(5, partition_func)
.keys())
Run Code Online (Sandbox Code Playgroud)
对于CPython中相对较短(最多1 << 60左右)的整数系列,您可以使用散列分区:
(sc.range(0, 10, 1)
.map(lambda x: (x, None))
.partitionBy(10)
.keys()
.glom()
.collect())
Run Code Online (Sandbox Code Playgroud)
[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]]
Run Code Online (Sandbox Code Playgroud)
但仅仅是一个实现细节(hash(x)
这里isinstance(x, int)
等于x
).
归档时间: |
|
查看次数: |
755 次 |
最近记录: |