We have large MongoDB database (about 1,4mln collections), MongoDB 3.0, engine rocksDB, operating system Ubuntu 14.04.
This DB is located on virtual machine (VmWare vCloud) with 16 cores and 108 GB RAM (currently mongoDB used 70GB of memory without swap).
Production setup options:
DB stats:
{
"db" : "ctp",
"collections" : 1369486,
"objects" : 20566852,
"avgObjSize" : 1126.82749999854,
"dataSize" : 23175294422,
"storageSize" : 23231888384,
"numExtents" …Run Code Online (Sandbox Code Playgroud) 我正在开发Windows机器上编写Kafka Streams应用程序.如果我尝试使用leftJoin和branch卡夫卡流的特点,我得到执行的jar应用程序时的错误如下:
Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: C:\Users\user\AppData\Local\Temp\librocksdbjni325337723194862275.dll: Can't find dependent libraries
at java.lang.ClassLoader$NativeLibrary.load(Native Method)
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
at org.rocksdb.Options.<clinit>(Options.java:22)
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38)
at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:75)
at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:72)
at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at …Run Code Online (Sandbox Code Playgroud) 我认为Kafka Streams可以提供帮助,但是我找不到有帮助的文档/示例.
我找到了一个类似的问题,但它没有任何实现建议(我目前丢失的地方):Kafka Streams等待依赖对象的功能
我想做的事:
我想将Kafka主题中的相关记录关联到单个对象中并生成该新对象.例如,可能有5个消息记录通过唯一键相互关联 - 我想从这些相关对象构建一个新对象,并将其生成到新队列.
我希望所有相关事件都能在一小时内消耗掉.卡夫卡将其描述为滑动窗口.一旦ID为"123"的消息记录A到达消费者,应用程序必须至少等待一小时才能到达ID为"123"的剩余记录.在所有记录到达或一小时后,它们都是过期记录.
最后,一小时内收集的所有相关消息都用于创建新的Object,然后发送到另一个Kafka队列.
我遇到的问题.
Kafka中的滑动窗口似乎只有在将两个流连接在一起时才能工作.我们只有一个流连接到主题 - 我不知道为什么需要两个流或我们将如何实现这一点.我在网上找不到这个例子.我在Kafka中看到的所有流函数在收集相同密钥的事件时简单地聚合/缩减为简单值.例如,键出现的次数或累加某些值
这里有一些伪代码来描述我在说什么.如果功能存在,函数名称/语义将会不同.
KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
kstream.windowedBy(
// One hour sliding Window
)
.collectAllRelatedKeys(
// Collect all Records related to each key
// map == HashMap<Key, ArrayList<Value>>
map.get(key).add(value);
)
.transformAndProcess(
if(ALL_EVENTS_COLLECTED) {
// Create new Object from all related records
newObject =
createNewObjectFromRelatedRecordsFunction(map.get(key));
producer.send(newObject);
}
)
Run Code Online (Sandbox Code Playgroud)
问题(谢谢你的帮助):
我知道 Cassandra、rocksdb 等 DBS 中的分级压缩是如何工作的。有些级别的最大数量为 4,有些级别为 7。这个数字如何影响压缩过程?为什么我不能只有 2 个级别,第一个已刷新内存表数据(文件之间可能重叠)和第二个包含非重叠 SST?
如果有任何文档或重复问题,请重定向。
编辑 1:当级别数增加时,重复数据会增加。
鉴于持久键/值存储的以下要求:
鉴于这种使用模式:
鉴于要求,最好的磁盘数据结构/算法是什么?
自定义实现能否超过基于 LSM(日志结构化合并)的实现(即 leveldb、rocksdb)的性能?
满足这些要求的高性能自定义实现在实现上是否也会相当简单?
是否可以有效地获取存储在RocksDB键值存储中的键值对的数量?
我查看了wiki,到目前为止还没有看到任何讨论这个主题的内容.这种手术甚至可能吗?
我正在iOS上使用RocksDB,并且正在使用AsyncStorage适配器以及redux-persist。每当我启动应用程序时,都会出现错误:
Failed to open db at path /Users/chadwilken/Library/Developer/CoreSimulator/Devices/818C47D2-ECF0-4003-865E-1FCAADCEF624/data/Containers/Data/Application/6C4F8F80-52E3-48B1-8ED5-84FB9F087514/Documents/RKAsyncRocksDBStorage.
RocksDB Status: IO error: lock /Users/chadwilken/Library/Developer/CoreSimulator/Devices/818C47D2-ECF0-4003-865E-1FCAADCEF624/data/Containers/Data/Application/6C4F8F80-52E3-48B1-8ED5-84FB9F087514/Documents/RKAsyncRocksDBStorage/LOCK: No locks available.
Run Code Online (Sandbox Code Playgroud)
充当适配器的类是:
// Copyright 2004-present Facebook. All Rights Reserved.
#import "AsyncRocksDBStorage.h"
#include <string>
#import <Foundation/Foundation.h>
#import <RCTConvert.h>
#import <RCTLog.h>
#import <RCTUtils.h>
#include <rocksdb/db.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/options.h>
#include <rocksdb/slice.h>
#include <rocksdb/status.h>
static NSString *const RKAsyncRocksDBStorageDirectory = @"RKAsyncRocksDBStorage";
namespace {
rocksdb::Slice SliceFromString(NSString *string)
{
return rocksdb::Slice((const char *)[string UTF8String], [string lengthOfBytesUsingEncoding:NSUTF8StringEncoding]);
}
void deepMergeInto(NSMutableDictionary *output, NSDictionary *input) {
for (NSString *key in input) …Run Code Online (Sandbox Code Playgroud) 当我使用它构建RocksDB 时make static_lib,会生成一个 200MB+ 的librocksdb.a文件,但是当我通过包管理器(与Brew和apt相比)安装相同版本时,该.a文件只有大约 11MB。我错过了什么?
使用make static_lib以下方法从源构建时库的大小:
ubuntu@local:~/rocksdb-4.1$ du -sh librocksdb.a
238M librocksdb.a
Run Code Online (Sandbox Code Playgroud)
sudo apt-get install librocksdb-dev在 Xenial 上安装的库的大小:
ubuntu@local:~/rocksdb-4.1$ du -sh /usr/lib/librocksdb.a
11M /usr/lib/librocksdb.a
Run Code Online (Sandbox Code Playgroud)
为什么会有这么大的差别?
文档不清楚。我什么时候想将保留重复项设置为假/真。这是做什么用的?它是针对 RocksDB 中的特定内容吗?
挖掘流内部代码似乎被用来设置一些序列号?
RocksDBWindowStore.java
private void maybeUpdateSeqnumForDups() {
if (this.retainDuplicates) {
this.seqnum = this.seqnum + 1 & 2147483647;
}
Run Code Online (Sandbox Code Playgroud) 有没有办法限制或定义 kafka 流应用程序的最大内存使用量?我已经启用了状态存储的缓存,但是当我在 Openshift 中部署时,我的 pod 被 OOM 杀死了。我已经检查过我没有内存泄漏,并且我所有的状态存储迭代器都正在关闭。
我已经将我的 RocksDbConfigSetter 更新为https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options 中的建议,但没有运气。
当我查看 state store 目录时,大小约为 2GB。目前有 50GB 的内存分配给应用程序,但它仍然 OOM
rocksdb ×10
apache-kafka ×2
leveldb ×2
acid ×1
asyncstorage ×1
c++ ×1
cassandra ×1
database ×1
freeze ×1
ios ×1
java ×1
large-data ×1
makefile ×1
mongodb ×1
nosql ×1
react-native ×1
scylla ×1