Apache Flink 中是否有相当于 Kafka 的 KTable 的东西?

vic*_*tim 5 apache-flink apache-kafka-streams

Apache Kafka 有一个 KTable 的概念,其中

其中每个数据记录代表一个更新

本质上,我可以使用一个 kafka 主题,并且只保留每个键的最新消息。

Apache Flink 中是否有类似的概念?我已经阅读了Flink 的 Table API,但似乎没有解决同样的问题。

一些帮助比较和对比这两个框架会很有帮助。我并不是在寻找哪个更好或更差。而是它们有何不同。正确的答案取决于我的要求。

Fab*_*ske 5

你是对的。Flink的Table API及其Table类与Kafka的KTable不对应。Table API 是一种关系语言嵌入式 API(想想 Java 和 Scala 中集成的 SQL)。

Flink 的 DataStream API 没有对应于 KTable 的内置概念。相反,Flink 提供复杂的状态管理,并且 KTable 将是具有键控状态的常规运算符。

例如,具有两个输入的有状态运算符存储从第一个输入观察到的最新值并将其与第二个输入的值连接起来,可以使用 a 实现,CoFlatMapFunction如下所示:

DataStream<Tuple2<Long, String>> first = ...
DataStream<Tuple2<Long, String>> second = ...

DataStream<Tuple2<String, String>> result = first
  // connect first and second stream
  .connect(second)
  // key both streams on the first (Long) attribute
  .keyBy(0, 0)
  // join them
  .flatMap(new TableLookup());

// ------

public static class TableLookup 
  extends RichCoFlatMapFunction<Tuple2<Long,String>, Tuple2<Long,String>, Tuple2<String,String>> {

  // keyed state
  private ValueState<String> lastVal;

  @Override
  public void open(Configuration conf) {
    ValueStateDescriptor<String> valueDesc = 
      new ValueStateDescriptor<String>("table", Types.STRING);
    lastVal = getRuntimeContext().getState(valueDesc);
  }

  @Override
  public void flatMap1(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
    // update the value for the current Long key with the String value.
    lastVal.update(value.f1);
  }

  @Override
  public void flatMap2(Tuple2<Long, String> value, Collector<Tuple2<String, String>> out) throws Exception {
    // look up latest String for current Long key.
    String lookup = lastVal.value();
    // emit current String and looked-up String
    out.collect(Tuple2.of(value.f1, lookup));
  }
}
Run Code Online (Sandbox Code Playgroud)

一般来说,状态可以非常灵活地与 Flink 一起使用,让您可以实现广泛的用例。还有更多状态类型,例如ListStateMapStateProcessFunction您可以对时间进行细粒度控制,例如,如果某个键在一段时间内没有更新,则删除该键的状态(KTables 有一个配置)据我所知)。