Gov*_*Raj 0 java apache-kafka apache-kafka-streams
我想使用状态存储(RocksDB)将一条记录转换为多条记录。我知道有一种方法,例如stream.transform(final TransformerSupplier> TransformerSupplier,final String... stateStoreNames),但如何返回多个KeyValue对,以便我稍后可以使用分支发布到受尊重的主题?
有一种方法可以将数据转发到下游,但如何再次使用该数据?
卡夫卡版本 - 1.1.0
如果我理解正确,您希望根据状态存储中的数据发出多条记录。在transform()Kafka Streams 2.2 之前,您可以通过context.forward()在Transformer. 例如:
stream
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
context.forward(key, value);
context.forward(key + 1, value + 1;)
return null;
}
@Override
public void close() {
}
}, stateStoreName);
Run Code Online (Sandbox Code Playgroud)
请注意,通过使用context.forward(),您没有编译时类型安全性。如果转发的记录的类型与输出的类型不符KStream(<Integer, Integer>在上面的示例中),代码会编译,但在运行时会引发异常。
从 Kafka Streams 2.2 开始,您可以使用flatTransform(). 您可以flatTransform()返回记录列表,而不是像上面的示例那样context.forward()多次使用并返回。null以这种方式使用flatTransform()可以保证编译时类型安全。
| 归档时间: |
|
| 查看次数: |
1692 次 |
| 最近记录: |