flink作业不是跨机器分布的

Bal*_*ala 9 scala batch-processing apache-flink

我在Apache flink中有一个小用例,即批处理系统.我需要处理一组文件.每个文件的处理必须由一台机器处理.我有以下代码.始终只占用一个任务槽,并且一个接一个地处理文件.我有6个节点(所以6个任务管理器),并在每个节点配置4个任务槽.所以,我希望一次处理24个文件.

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
  override def mapPartition(
      myfiles: java.lang.Iterable[java.io.File],
      out:org.apache.flink.util.Collector[Int])
    : Unit  =  {
    var temp = myfiles.iterator()
    while(temp.hasNext()){
      val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
      val file = new File(temp.next().toURI)
      Process(
        "/bin/bash ./run.sh  " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
        new File(fp1.getAbsoluteFile.getParent))
        .lines
        .foreach{println}
      out.collect(1)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我启动了flink as ./bin/start-cluster.sh命令,Web用户界面显示它有6个任务管理器,24个任务槽.

这些文件夹包含大约49个文件.当我在这个集合上创建mapPartition时,我希望跨越49个并行进程.但是,在我的基础设施中,它们都是一个接一个地处理的.这意味着只有一台机器(一个任务管理器)处理所有49个文件名.我想要的是,每个插槽配置2个任务,我希望同时处理24个文件.

任何指针肯定会有所帮助.我在flink-conf.yaml文件中有这些参数

jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Run Code Online (Sandbox Code Playgroud)

提前致谢.谁能让我知道我哪里出错了?

Til*_*ann 2

正如 David 所描述的,问题是env.fromCollection(Iterable[T])创建一个DataSource具有非并行的InputFormat。因此, 的DataSource执行并行度为1。后续运算符 ( mapPartition) 从源继承了这种并行性,以便它们可以链接起来(这为我们节省了一次网络洗牌)。

解决这个问题的方法是显式地重新平衡DataSet

env.fromCollection(folders).rebalance()
Run Code Online (Sandbox Code Playgroud)

或者在后续运算符 ( mapPartition) 处显式设置所需的并行度:

env.fromCollection(folders).mapPartition(...).setParallelism(49)
Run Code Online (Sandbox Code Playgroud)