use*_*551 6 apache-flink flink-streaming
我有 3 个不同类型的键控数据流。
DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
Run Code Online (Sandbox Code Playgroud)
每个流都定义了自己的处理逻辑,并在它们之间共享状态。我想连接这 3 个流,只要任何流中有数据可用,就会触发相应的处理函数。可以连接两个流。
first.connect(second).process(<CoProcessFunction>)
Run Code Online (Sandbox Code Playgroud)
我无法使用联合(允许多个数据流),因为类型不同。我想避免创建包装器并将所有流转换为相同类型。
除了联合之外,标准方法是使用级联连接,例如,
first.connect(second).process(...).connect(third).process(...)
您将无法在一处共享所有三个流之间的状态。您可以让第一个处理函数输出后续处理函数所需的任何内容,但第三个流将无法影响第一个处理函数中的状态,这对于某些用例来说是一个问题。
另一种可能性可能是利用较低级别的机制 - 请参阅FLIP-92:在 Flink 中添加 N-Ary Stream Operator。但是,此机制仅供内部使用(Table/SQL API 将其用于 n 路联接),需要谨慎对待。有关详细信息,请参阅邮件列表讨论。我提到这一点是为了完整性,但我怀疑这是否是一个好主意,直到界面得到进一步开发。
您可能还想查看有状态函数 api,它克服了数据流 api 的许多限制。
实际上,包装器方法还不错。您可以创建一个EitherOfThree<T1, T2, T3>类似于 Flink 现有的包装类Either<Left, Right>,然后在单个函数中处理这些记录的流。就像是:
DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
.union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
.union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
combo.process(new MyProcessFunction());
Run Code Online (Sandbox Code Playgroud)
Flink 的Either类有一个更优雅的实现,但对于您的用例来说,简单的东西应该可以工作。
| 归档时间: |
|
| 查看次数: |
3592 次 |
| 最近记录: |