我使用的是flink 1.3,我定义了两个流源,它们将发出相同的事件以供后续运算符处理(我定义的进程运算符和接收器运算符)
但看起来在 source-process-pink 管道中,我只能指定一个源,我会问如何指定两个或多个源并执行相同的进程和接收器
object FlinkApplication {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.addSource(new MySource1()) //How to MySource2 here?
.setParallelism(1)
.name("source1")
.process(new MyProcess())
.setParallelism(4)
.addSink(new MySink())
.setParallelism(2)
env.execute("FlinkApplication")
}
}
Run Code Online (Sandbox Code Playgroud)
该 API 在如何设置处理管道方面提供了很大的灵活性。如果您想将相同的逻辑应用于多个源,您可以这样做:
env.addSource(new MySource1())
.process(new MyProcess())
.addSink(new MySink())
env.addSource(new MySource2())
.process(new MyProcess())
.addSink(new MySink())
env.execute()
Run Code Online (Sandbox Code Playgroud)
或者,如果这样做更有意义,您可以合并两个流,然后处理组合流(或这些方法的某种组合):
stream1.union(stream2)
.process(...)
.addSink(...)
Run Code Online (Sandbox Code Playgroud)
如果您想分叉流并对每个副本应用不同的操作,也可以以相反的方式执行操作:
val stream: DataStream[T] = env.addSource(new MySource())
stream.process(new MyProcess1())
.addSink(new MySink1())
stream.process(new MyProcess2())
.addSink(new MySink2())
env.execute()
Run Code Online (Sandbox Code Playgroud)
哇,Flink 1.3 已经三年多了!
| 归档时间: |
|
| 查看次数: |
1107 次 |
| 最近记录: |