我正在从java驱动程序2.12.3升级到3.3.0.奇怪的是,收集池似乎突然"起作用".
我的设置如下:
Connection在主线程中建立:
mongoClient = new MongoClient(new MongoClientURI("mongodb://localhost:27017"));
mongoClient.setWriteConcern(new WriteConcern(0, 10)); // deprecated, replace soon
database = mongoClient.getDatabase("Example");
// java.util.logging.Logger.getLogger("org.mongodb.driver").setLevel(Level.SEVERE);
Run Code Online (Sandbox Code Playgroud)
它用于数百个线程:
org.bson.Document oldDoc = DBInteractions.readOneFromDb("articles");
Run Code Online (Sandbox Code Playgroud)
使用这样的函数:
static synchronized Document readOneFromDb(String col) {
return database.getCollection(col).find().limit(1).sort(new Document().append("count", 1)).first();
}
Run Code Online (Sandbox Code Playgroud)
对于每次数据库交互,我都会收到这样的警告:
Sep 26, 2016 2:33:19 PM com.mongodb.diagnostics.logging.JULLogger log
INFORMATION: Closed connection [connectionId{localValue:42, serverValue:248}] to localhost:27017 because the pool has been closed.
Run Code Online (Sandbox Code Playgroud)
看起来好像连接池仅在一次交互后关闭.但为什么?很困惑任何一个想法?
我刚刚开始使用redis,生菜和异步编码.现在遗憾的是,我找不到任何关于如何从侦听器获取消息到我的程序中的示例.我在这些函数上找到的javadoc或任何其他信息也没有多大帮助.那么有人可以解释如何将已发布的消息转换为字符串吗?
我的代码目前看起来像这样:
RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {@Override methods to be implemented???}
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
Run Code Online (Sandbox Code Playgroud)
我很确定我必须实现监听器的消息方法,但我甚至没有任何线索来开始.我知道params代表什么......但是这些方法的返回值为void,因此它们也不会输出任何消息.
那么,哪里开始呢?(完全不解)
对于不耐烦的读者:这是一项正在进行的工作,我在此过程中寻求帮助。请不要根据我的临时数据来判断这些工具,因为在我尝试获得更好的结果时它们可能会发生变化。
我们正处于架构决策过程的中间,该工具用于分析协同仿真的输出。
作为该过程的一部分,我被要求编写一个基准测试工具,并获取有关多个分布式处理框架速度的数据。
我测试的框架是:Apache Spark、Apache Flink、Hazelcast Jet。并作为比较基准纯 Java。
我使用的测试用例是一个简单的“这是一个 Pojo 列表,pojo 中的一个字段是双精度值。找到最小的(最小)值”。
简单,直接,希望具有高度可比性。
四分之三的测试使用一个简单的比较器,第四个(flink)使用与比较器基本相同的减速器。分析函数如下所示:
Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();
Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();
Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();
Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();
Run Code Online (Sandbox Code Playgroud)
我对此进行了广泛的测试,改变了测试列表的大小以及分配的资源。结果让我大吃一惊。最佳结果如下(所有数字以毫秒为单位,1 个 mio pojo,每个 10 个测试):
结果:
java:
Instances:
List:
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16,
Overall: 111, 24, …Run Code Online (Sandbox Code Playgroud)