如何在flink应用程序中指定两个源、一个进程运算符和一个接收器运算符

Tom*_*Tom 0 apache-flink

我使用的是flink 1.3,我定义了两个流源,它们将发出相同的事件以供后续运算符处理(我定义的进程运算符和接收器运算符)

但看起来在 source-process-pink 管道中,我只能指定一个源,我会问如何指定两个或多个源并执行相同的进程和接收器

object FlinkApplication {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.addSource(new MySource1()) //How to MySource2 here?
      .setParallelism(1)
      .name("source1")
      .process(new MyProcess())
      .setParallelism(4)
      .addSink(new MySink())
      .setParallelism(2)
    env.execute("FlinkApplication")
  }

}
Run Code Online (Sandbox Code Playgroud)

Dav*_*son 6

该 API 在如何设置处理管道方面提供了很大的灵活性。如果您想将相同的逻辑应用于多个源,您可以这样做:

env.addSource(new MySource1())
  .process(new MyProcess())
  .addSink(new MySink())

env.addSource(new MySource2())
  .process(new MyProcess())
  .addSink(new MySink())

env.execute()
Run Code Online (Sandbox Code Playgroud)

或者,如果这样做更有意义,您可以合并两个流,然后处理组合流(或这些方法的某种组合):

stream1.union(stream2)
  .process(...)
  .addSink(...)
Run Code Online (Sandbox Code Playgroud)

如果您想分叉流并对每个副本应用不同的操作,也可以以相反的方式执行操作:

val stream: DataStream[T] = env.addSource(new MySource())

stream.process(new MyProcess1())
  .addSink(new MySink1())

stream.process(new MyProcess2())
  .addSink(new MySink2())

env.execute()
Run Code Online (Sandbox Code Playgroud)

哇,Flink 1.3 已经三年多了!