标签: rocksdb

用 Kafka Streams 中的内存状态存储替换 RocksDB

我正在使用 Kafka Streams 0.10.1.1 版本。

状态存储的 RocksDB 实现无法处理我们的 50k/msg 速率,因此我想将状态存储更改为内存中的状态存储。根据文档,这应该是可能的:http : //docs.confluent.io/3.1.0/streams/architecture.html#state

但是,当我实现这一点时:

val stateStore = Stores.create(stateStoreName).withStringKeys().withStringKeys().inMemory().build()

val procSuppl: KStreamAggregate = ... // I'll spare the implementation details

streamBuilder.addSource(
  "mysource",
  new StringDeserializer(),
  new StringDeserializer(),
  "input_topic"
).addProcessor("proc", procSuppl,  "mysource").addStateStore(stateStore, "proc")
Run Code Online (Sandbox Code Playgroud)

我最终在运行时出现此错误:

Caused by: java.lang.ClassCastException: org.apache.kafka.streams.state.internals.MeteredKeyValueStore cannot be cast to org.apache.kafka.streams.state.internals.CachedStateStore
2017-01-23T13:19:11.830674020Z  at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.init(KStreamAggregate.java:62)
Run Code Online (Sandbox Code Playgroud)

上述方法的实现是:

Caused by: java.lang.ClassCastException: org.apache.kafka.streams.state.internals.MeteredKeyValueStore cannot be cast to org.apache.kafka.streams.state.internals.CachedStateStore
2017-01-23T13:19:11.830674020Z  at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.init(KStreamAggregate.java:62)
Run Code Online (Sandbox Code Playgroud)

为什么要尝试将状态存储转换为CachedStateStore实例?如何根据文档实现一个简单的内存状态存储?

谢谢

apache-kafka rocksdb apache-kafka-streams

2
推荐指数
1
解决办法
4019
查看次数

如何将RocksDB安装到ubuntu中?

RocksDB是Facebook创建的键/值数据库,效果很好,但是缺少有关如何安装生产版本的文档。

ubuntu rocksdb

2
推荐指数
2
解决办法
4605
查看次数

Flink 中的 RocksDBStateBackend:它究竟是如何工作的?

我已经阅读了 Flink 关于状态后端的官方文档,这里。特别是,我对RocksDBStateBackend很感兴趣。

我不明白,如果我启用这种后端,RocksDB 是否可以通过 Flink 集群内的另一个节点从TaskManagers访问?

到目前为止,我对 RocksDBStateBackend 的理解是任务管理器将状态存储在它们的内存中,即 JVM 进程的内存中。之后,他们会将状态发送到存储在 RocksDB 中吗?如果是,Flink 集群中的 RocksDB 在哪里?物理上在哪里?

rocksdb apache-flink

2
推荐指数
1
解决办法
625
查看次数

Kafka 流状态目录 io 错误

流运行一定时间后给出以下错误?我找不到谁负责创建 .sst 文件?

环境:

卡夫卡版本 0.10.0-cp1

Scala 2.11.8

    org.apache.kafka.streams.errors.ProcessorStateException: Error while executing flush from store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:424)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:414)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:330)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:247)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:446)
        at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:434)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:340)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.rocksdb.RocksDBException: IO error: /tmp/kafka-streams/pos/0_0/rocksdb/agg/000008.sst: No such file or directory
        at org.rocksdb.RocksDB.flush(Native Method)
        at org.rocksdb.RocksDB.flush(RocksDB.java:1329)
        at org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:422)
        ... 9 more
[2016-06-24 11:13:54,910] ERROR Failed to commit StreamTask #0_0 in thread [StreamThread-1]:  (org.apache.kafka.streams.processor.internals.StreamThread:452)
org.apache.kafka.streams.errors.ProcessorStateException: Error while batch writing to store agg
        at org.apache.kafka.streams.state.internals.RocksDBStore.putAllInternal(RocksDBStore.java:324)
        at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka rocksdb apache-kafka-streams

1
推荐指数
1
解决办法
2764
查看次数

如何完全关闭 Rocksdb?

我在应用程序中使用 Rocksdb,在关闭所有 db 实例后,我仍然看到运行着 Rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long) () 的线程

因此,当我关闭应用程序时,我收到一个 libc++abi.dylib: terifying 错误,我认为这是由于上述原因造成的。如何避免这种情况并确保这些线程已关闭?谢谢

rocksdb

1
推荐指数
1
解决办法
891
查看次数