Jon*_*nyD 5 java concurrency guava
从Java 8开始,Guava的某些功能已经过时了(例如,字符串连接器,可选值,前提条件检查,Future等)。
在ListenableFuture文档中(自1年前以来没有更新),他们说:
“我们强烈建议您在所有代码中始终使用ListenableFuture而不是Future,因为...”
我在一个旧项目中使用了番石榴(和cassandra),我的问题是:Java 8标准库是否已经ListenableFuture废弃了某些东西,还是仍然是最佳Future选择?谢谢。
小智 0
同意/sf/users/235973601/,CompletableFuture 是 Java 内置的,而 ListenableFuture 则依赖于 Guava。将 ListenableFuture 转换为 CompletableFuture 很容易。
就像我对 com.datastax 驱动程序核心 3.11.0 所做的以下测试更改一样
CompletableFuture initAsync() {
CompletableFuture ret = new CompletableFuture();
如果(工厂.isShutdown){
ret.completeExceptionally(
new ConnectionException(endPoint, "连接工厂已关闭"));
返回ret;
}
协议版本 协议版本 =
工厂.protocolVersion == null
?协议版本.NEWEST_SUPPORTED
: 工厂.协议版本;
尝试 {
Bootstrap bootstrap = 工厂.newBootstrap();
ProtocolOptions 协议选项=factory.configuration.getProtocolOptions();
bootstrap.handler(
新的初始化器(
这,
协议版本,
protocolOptions.getCompression().compressor(),
协议选项.getSSLOptions(),
factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(),
工厂.configuration.getNettyOptions(),
工厂.configuration.getCodecRegistry(),
factory.configuration.getMetricsOptions().isEnabled()
?工厂.经理.指标
: 无效的));
ChannelFuture future = bootstrap.connect(endPoint.resolve());
writer.incrementAndGet();
future.addListener(
新的 ChannelFutureListener() {
@覆盖
公共无效操作完成(ChannelFuture future)抛出异常{
writer.decrementAndGet();
// 注意:future.channel()在某些错误情况下可能为null,所以我们需要警惕
// 它在下面的其余代码中。
频道 = future.channel();
if (isClosed() && 通道!= null) {
渠道
。关闭()
.addListener(
新的 ChannelFutureListener() {
@覆盖
公共无效操作完成(ChannelFuture future)抛出异常{
ret.completeExceptionally(
新的传输异常(
连接.this.endPoint,
“初始化期间连接关闭。”));
}
});
} 别的 {
如果(通道!= null){
Connection.this.factory.allChannels.add(channel);
}
if (!future.isSuccess()) {
if (logger.isDebugEnabled())
记录器.调试(
字符串. 格式(
“%s 连接到 %s%s 时出错”,
连接.this,
连接.this.endPoint,
extractMessage(future.cause())));
ret.completeExceptionally(
新的传输异常(
Connection.this.endPoint, "无法连接", future.cause()));
} 别的 {
断言通道!= null;
记录器.调试(
“{} 连接已建立,正在初始化传输”, Connection.this);
Channel.closeFuture().addListener(new ChannelCloseListener());
ret.complete(null);
}
}
}
});
} catch (RuntimeException e) {
closeAsync().force();
扔 e;
}
执行器 initExecutor =
factory.manager.configuration.getPoolingOptions().getInitializationExecutor();
返回 ret.thenCompose(
无效 -> {
CompletableFuture ret2 = new CompletableFuture();
ProtocolOptions 协议选项=factory.configuration.getProtocolOptions();
未来启动ResponseFuture =
写(
新的请求.Startup(
protocolOptions.getCompression(), protocolOptions.isNoCompact()));
ListenableFuture 通道ReadyFuture =
GuavaCompatibility.INSTANCE.transformAsync(
启动响应未来,
onStartupResponse(协议版本, initExecutor),
初始化执行器);
GuavaCompatibility.INSTANCE.addCallback(
通道ReadyFuture,
新的 FutureCallback() {
@覆盖
公共无效onSuccess(无效结果){
ret2.complete(null);
}
@覆盖
公共无效onFailure(Throwable t){
// 确保连接正确关闭。
if (t instanceof ClusterNameMismatchException
|| t instanceof UnsupportedProtocolVersionException) {
// 只是传播
closeAsync().force();
ret2.completeExceptionally(t);
} 别的 {
// 失效以确保发出错误信号(将主机标记为关闭)
可投掷 e =
(t实例连接异常
|| DriverException 的实例
|| t 中断异常实例
|| t实例错误)
?t
: 新的连接异常(
连接.this.endPoint,
字符串. 格式(
“传输初始化期间出现意外错误 (%s)”,t),
t);
ret2.completeExceptionally(defunct(e));
}
// 确保如果调用者取消返回的 future,连接就会关闭。
如果(!isClosed()){
closeAsync().force();
}
}
});
返回ret2;
});
}
私人 CompletableFuture createPools(集合主机) {
列表> futures = Lists.newArrayListWithCapacity(hosts.size());
对于(主机主机:主机)
if (host.state != Host.State.DOWN) futures.add(maybeAddPool(host, null));
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
}
| 归档时间: |
|
| 查看次数: |
896 次 |
| 最近记录: |