使用Kafka Stream中的状态存储(RocksDB)将一条记录转换为多条记录

Gov*_*Raj 0 java apache-kafka apache-kafka-streams

我想使用状态存储(RocksDB)将一条记录转换为多条记录。我知道有一种方法,例如stream.transform(final TransformerSupplier> TransformerSupplier,final String... stateStoreNames),但如何返回多个KeyValue对,以便我稍后可以使用分支发布到受尊重的主题?

有一种方法可以将数据转发到下游,但如何再次使用该数据?

卡夫卡版本 - 1.1.0

Bru*_*nna 6

如果我理解正确,您希望根据状态存储中的数据发出多条记录。在transform()Kafka Streams 2.2 之前,您可以通过context.forward()Transformer. 例如:

stream
   .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
       private ProcessorContext context;

       @Override
       public void init(final ProcessorContext context) {
           this.context = context;
       }

       @Override
       public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
           context.forward(key, value);
           context.forward(key + 1, value + 1;)
           return null;
       }

       @Override
       public void close() {
       }
   }, stateStoreName);
Run Code Online (Sandbox Code Playgroud)

请注意,通过使用context.forward(),您没有编译时类型安全性。如果转发的记录的类型与输出的类型不符KStream<Integer, Integer>在上面的示例中),代码会编译,但在运行时会引发异常。

从 Kafka Streams 2.2 开始,您可以使用flatTransform(). 您可以flatTransform()返回记录列表,而不是像上面的示例那样context.forward()多次使用并返回。null以这种方式使用flatTransform()可以保证编译时类型安全。