eth*_*nny 5 java apache-kafka-streams
我正在构建拓扑,并希望使用KStream.process()将一些中间值写入数据库.此步骤不会更改数据的性质,并且完全是无状态的.
添加处理器需要创建一个ProcessorSupplier并将此实例KStream.process()与状态存储的名称一起传递给该函数.这是我不明白的.
如何将StateStore对象添加到拓扑中,因为它需要StateStoreSupplier?
StateStore启动应用程序时,无法添加说明会出现此错误:
线程"main"中的异常org.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:尚未添加StateStore my-state-store.
为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的.
通过应用处理器来处理此流中的所有元素,一次一个元素.
Mic*_*oll 12
这是一个关于如何使用状态存储的简单示例,取自Kafka Streams上的Confluent Platform文档.
第1步:定义StateStore/ StateStoreSupplier:
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
Run Code Online (Sandbox Code Playgroud)
- 我没有看到将StateStore对象添加到拓扑的方法.它也需要StateStoreSupplier.
步骤2:将状态存储添加到拓扑中.
选项A - 使用Processor API时:
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");
Run Code Online (Sandbox Code Playgroud)
选项B - 使用Kafka Streams DSL时:
在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中.然后,在调用诸如KStream#process()或的方法时KStream#transform(),您还必须传入状态存储的名称 - 否则您的应用程序将在运行时失败.
举个例子KStream#transform():
KStreamBuilder builder = new KStreamBuilder();
// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);
KStream<byte[], String> input = builder.stream("source-topic");
KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());
Run Code Online (Sandbox Code Playgroud)
为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的.
你是对的 - 如果处理器没有维持状态,你就不需要状态存储.
使用DSL时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中,稍后再引用它.
| 归档时间: |
|
| 查看次数: |
5728 次 |
| 最近记录: |