如何注册无状态处理器(似乎也需要StateStore)?

eth*_*nny 5 java apache-kafka-streams

我正在构建拓扑,并希望使用KStream.process()将一些中间值写入数据库.此步骤不会更改数据的性质,并且完全是无状态的.

添加处理器需要创建一个ProcessorSupplier并将此实例KStream.process()与状态存储的名称一起传递给该函数.这是我不明白的.

如何将StateStore对象添加到拓扑中,因为它需要StateStoreSupplier

StateStore启动应用程序时,无法添加说明会出现此错误:

线程"main"中的异常org.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑构建:尚未添加StateStore my-state-store.

为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的.

通过应用处理器来处理此流中的所有元素,一次一个元素.

Mic*_*oll 12

这是一个关于如何使用状态存储简单示例,取自Kafka Streams上Confluent Platform文档.

第1步:定义StateStore/ StateStoreSupplier:

StateStoreSupplier countStore = Stores.create("Counts")
                                      .withKeys(Serdes.String())
                                      .withValues(Serdes.Long())
                                      .persistent()
                                      .build();
Run Code Online (Sandbox Code Playgroud)
  1. 我没有看到将StateStore对象添加到拓扑的方法.它也需要StateStoreSupplier.

步骤2:将状态存储添加到拓扑中.

选项A - 使用Processor API时:

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
       .addProcessor("Process", () -> new WordCountProcessor(), "Source")
       // Add the countStore associated with the WordCountProcessor processor
       .addStateStore(countStore, "Process")
       .addSink("Sink", "sink-topic", "Process");
Run Code Online (Sandbox Code Playgroud)

选项B - 使用Kafka Streams DSL时:

在这里,您需要调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中.然后,在调用诸如KStream#process()或的方法时KStream#transform(),您还必须传入状态存储的名称 - 否则您的应用程序将在运行时失败.

举个例子KStream#transform():

KStreamBuilder builder = new KStreamBuilder();

// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);

KStream<byte[], String> input = builder.stream("source-topic");

KStream<String, Long> transformed =
    input.transform(/* your TransformerSupplier */, countStore.name());
Run Code Online (Sandbox Code Playgroud)

为什么处理器需要有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的.

你是对的 - 如果处理器没有维持状态,你就不需要状态存储.

使用DSL时,您只需调用KStreamBuilder#addStateStore("name-of-your-store")将状态存储添加到处理器拓扑中,稍后再引用它.