用 Kafka Streams 中的内存状态存储替换 RocksDB

use*_*530 2 apache-kafka rocksdb apache-kafka-streams

我正在使用 Kafka Streams 0.10.1.1 版本。

状态存储的 RocksDB 实现无法处理我们的 50k/msg 速率,因此我想将状态存储更改为内存中的状态存储。根据文档,这应该是可能的:http : //docs.confluent.io/3.1.0/streams/architecture.html#state

但是,当我实现这一点时:

val stateStore = Stores.create(stateStoreName).withStringKeys().withStringKeys().inMemory().build()

val procSuppl: KStreamAggregate = ... // I'll spare the implementation details

streamBuilder.addSource(
  "mysource",
  new StringDeserializer(),
  new StringDeserializer(),
  "input_topic"
).addProcessor("proc", procSuppl,  "mysource").addStateStore(stateStore, "proc")
Run Code Online (Sandbox Code Playgroud)

我最终在运行时出现此错误:

Caused by: java.lang.ClassCastException: org.apache.kafka.streams.state.internals.MeteredKeyValueStore cannot be cast to org.apache.kafka.streams.state.internals.CachedStateStore
2017-01-23T13:19:11.830674020Z  at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.init(KStreamAggregate.java:62)
Run Code Online (Sandbox Code Playgroud)

上述方法的实现是:

Caused by: java.lang.ClassCastException: org.apache.kafka.streams.state.internals.MeteredKeyValueStore cannot be cast to org.apache.kafka.streams.state.internals.CachedStateStore
2017-01-23T13:19:11.830674020Z  at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.init(KStreamAggregate.java:62)
Run Code Online (Sandbox Code Playgroud)

为什么要尝试将状态存储转换为CachedStateStore实例?如何根据文档实现一个简单的内存状态存储?

谢谢

小智 6

为了创建内存状态存储,需要创建一个存储供应商(使用Stores工厂对象):

val storeSupplier = Stores.inMemoryKeyValueStore("in-mem")
Run Code Online (Sandbox Code Playgroud)

那么你需要在实现一个KTable时使用商店供应商:

val wordCounts =  builder
  .stream[String, String]("streams-plaintext-input")
  .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
  .groupBy((_, word) => word)
  .count()(Materialized.as(storeSupplier))
Run Code Online (Sandbox Code Playgroud)

获取可查询的存储:

val qStore = streams.store(
  wordCounts.queryableStoreName,
  QueryableStoreTypes.keyValueStore[String, Long])
Run Code Online (Sandbox Code Playgroud)