Bat*_*ter 5 scala partitioning data-partitioning apache-flink
我正在使用DataSet API在Flink上使用Scala。我想在节点之间重新划分数据。Spark具有让用户使用给定numberOfPartitions参数(link)对数据进行重新分区的功能,我相信Flink不支持该功能。因此,我想通过实现自定义分区功能来实现这一目标。
我的数据类型为DataSet(Double,SparseVector)来自数据的示例行:
(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))
Run Code Online (Sandbox Code Playgroud)
由于“ Double”是二进制(1或-1),因此我想根据SparceVector的长度对数据进行分区。我的自定义分区器如下:
class myPartitioner extends Partitioner[SparseVector]
{
override def partition(key: SparseVector, numPartitions: Int): Int = {
key.size % numPartitions
}
}
Run Code Online (Sandbox Code Playgroud)
我将此自定义分区称为:
data.partitionCustom(new myPartitioner(),1)
Run Code Online (Sandbox Code Playgroud)
有人可以帮我理解在Scala中调用myPartitioner函数时如何将分区数指定为“ numPartitions”参数。
谢谢。
Spark使用repartition(n: Int)函数将数据重新分配到n个分区,这些分区将由n个任务处理。从我的角度来看,这包括两个变化:数据重新分配和下游任务数量。
因此,在 Apache Flink 中,我认为 Partitioner 映射到数据重新分配,并行度映射到下游任务的数量,这意味着您可以使用 setParallelism 来确定“numPartitions”。
| 归档时间: |
|
| 查看次数: |
341 次 |
| 最近记录: |