Flink中如何连接2个以上的流?

use*_*551 6 apache-flink flink-streaming

我有 3 个不同类型的键控数据流。

DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
Run Code Online (Sandbox Code Playgroud)

每个流都定义了自己的处理逻辑,并在它们之间共享状态。我想连接这 3 个流,只要任何流中有数据可用,就会触发相应的处理函数。可以连接两个流。

first.connect(second).process(<CoProcessFunction>)
Run Code Online (Sandbox Code Playgroud)

我无法使用联合(允许多个数据流),因为类型不同。我想避免创建包装器并将所有流转换为相同类型。

Dav*_*son 5

除了联合之外,标准方法是使用级联连接,例如,

first.connect(second).process(...).connect(third).process(...)

您将无法在一处共享所有三个流之间的状态。您可以让第一个处理函数输出后续处理函数所需的任何内容,但第三个流将无法影响第一个处理函数中的状态,这对于某些用例来说是一个问题。

另一种可能性可能是利用较低级别的机制 - 请参阅FLIP-92:在 Flink 中添加 N-Ary Stream Operator。但是,此机制仅供内部使用(Table/SQL API 将其用于 n 路联接),需要谨慎对待。有关详细信息,请参阅邮件列表讨论。我提到这一点是为了完整性,但我怀疑这是否是一个好主意,直​​到界面得到进一步开发。

您可能还想查看有状态函数 api,它克服了数据流 api 的许多限制。


kkr*_*ler 5

实际上,包装器方法还不错。您可以创建一个EitherOfThree<T1, T2, T3>类似于 Flink 现有的包装类Either<Left, Right>,然后在单个函数中处理这些记录的流。就像是:

    DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
        .union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
        .union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
    combo.process(new MyProcessFunction());
Run Code Online (Sandbox Code Playgroud)

Flink 的Either类有一个更优雅的实现,但对于您的用例来说,简单的东西应该可以工作。