Flink自定义分区功能

Bat*_*ter 5 scala partitioning data-partitioning apache-flink

我正在使用DataSet API在Flink上使用Scala。我想在节点之间重新划分数据。Spark具有让用户使用给定numberOfPartitions参数(link)对数据进行重新分区的功能,我相信Flink不支持该功能。因此,我想通过实现自定义分区功能来实现这一目标。

我的数据类型为DataSet(Double,SparseVector)来自数据的示例行:

(1.0 SparseVector((2024,1.0), (2025,1.0), (2030,1.0), (2045,1.0), (2046,1.41), (2063,1.0), (2072,1.0), (3031,1.0), (3032,1.0), (4757,1.0), (4790,1.0), (177196,1.0), (177197,0.301), (177199,1.0), (177202,1.0), (1544177,1.0), (1544178,1.0), (1544179,1.0), (1654031,1.0), (1654190,1.0), (1654191,1.0), (1654192,1.0), (1654193,1.0), (1654194,1.0), (1654212,1.0), (1654237,1.0), (1654238,1.0)))
Run Code Online (Sandbox Code Playgroud)

由于“ Double”是二进制(1或-1),因此我想根据SparceVector的长度对数据进行分区。我的自定义分区器如下:

class myPartitioner extends Partitioner[SparseVector]
{ 
    override def partition(key: SparseVector, numPartitions: Int): Int = {
         key.size % numPartitions
    } 
}
Run Code Online (Sandbox Code Playgroud)

我将此自定义分区称为:

data.partitionCustom(new myPartitioner(),1)
Run Code Online (Sandbox Code Playgroud)

有人可以帮我理解在Scala中调用myPartitioner函数时如何将分区数指定为“ numPartitions”参数。

谢谢。

Jia*_*iao 2

Spark使用repartition(n: Int)函数将数据重新分配到n个分区,这些分区将由n个任务处理。从我的角度来看,这包括两个变化:数据重新分配和下游任务数量。

因此,在 Apache Flink 中,我认为 Partitioner 映射到数据重新分配,并行度映射到下游任务的数量,这意味着您可以使用 setParallelism 来确定“numPartitions”。