番石榴的ListenableFuture是否仍然有用/最新(在Java 8之后)?

Jon*_*nyD 5 java concurrency guava

从Java 8开始,Guava的某些功能已经过时了(例如,字符串连接器,可选值,前提条件检查,Future等)。

ListenableFuture文档中(自1年前以来没有更新),他们说:

“我们强烈建议您在所有代码中始终使用ListenableFuture而不是Future,因为...”

我在一个项目中使用了番石榴(和cassandra),我的问题是:Java 8标准库是否已经ListenableFuture废弃了某些东西,还是仍然是最佳Future选择?谢谢。

小智 0

同意/sf/users/235973601/,CompletableFuture 是 Java 内置的,而 ListenableFuture 则依赖于 Guava。将 ListenableFuture 转换为 CompletableFuture 很容易。

  1. 在ListenableFuture可监听回调中创建CompletableFuture、complete()或completeExceptionally()。
  2. 使用 thenCompose 将 CompletableFuture 与 ListenableFuture 链接起来
  3. 使用 allOf 替换 Futures.allAsList

就像我对 com.datastax 驱动程序核心 3.11.0 所做的以下测试更改一样

CompletableFuture initAsync() {

    CompletableFuture ret = new CompletableFuture();
    如果(工厂.isShutdown){
      ret.completeExceptionally(
          new ConnectionException(endPoint, "连接工厂已关闭"));
      返回ret;
    }

    协议版本 协议版本 =
        工厂.protocolVersion == null
            ?协议版本.NEWEST_SUPPORTED
            : 工厂.协议版本;

    尝试 {
      Bootstrap bootstrap = 工厂.newBootstrap();
      ProtocolOptions 协议选项=factory.configuration.getProtocolOptions();
      bootstrap.handler(
          新的初始化器(
              这,
              协议版本,
              protocolOptions.getCompression().compressor(),
              协议选项.getSSLOptions(),
              factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(),
              工厂.configuration.getNettyOptions(),
              工厂.configuration.getCodecRegistry(),
              factory.configuration.getMetricsOptions().isEnabled()
                  ?工厂.经理.指标
                  : 无效的));

      ChannelFuture future = bootstrap.connect(endPoint.resolve());

      writer.incrementAndGet();
      future.addListener(
          新的 ChannelFutureListener() {
            @覆盖
            公共无效操作完成(ChannelFuture future)抛出异常{
              writer.decrementAndGet();
              // 注意:future.channel()在某些错误情况下可能为null,所以我们需要警惕
              // 它在下面的其余代码中。
              频道 = future.channel();
              if (isClosed() && 通道!= null) {
                渠道
                    。关闭()
                    .addListener(
                        新的 ChannelFutureListener() {
                          @覆盖
                          公共无效操作完成(ChannelFuture future)抛出异常{

                            ret.completeExceptionally(
                                新的传输异常(
                                    连接.this.endPoint,
                                    “初始化期间连接关闭。”));
                          }
                        });
              } 别的 {
                如果(通道!= null){
                  Connection.this.factory.allChannels.add(channel);
                }
                if (!future.isSuccess()) {
                  if (logger.isDebugEnabled())
                    记录器.调试(
                        字符串. 格式(
                            “%s 连接到 %s%s 时出错”,
                            连接.this,
                            连接.this.endPoint,
                            extractMessage(future.cause())));
                  ret.completeExceptionally(
                      新的传输异常(
                          Connection.this.endPoint, "无法连接", future.cause()));
                } 别的 {
                  断言通道!= null;
                  记录器.调试(
                      “{} 连接已建立,正在初始化传输”, Connection.this);
                  Channel.closeFuture().addListener(new ChannelCloseListener());
                  ret.complete(null);
                }
              }
            }
          });
    } catch (RuntimeException e) {
      closeAsync().force();
      扔 e;
    }

    执行器 initExecutor =
        factory.manager.configuration.getPoolingOptions().getInitializationExecutor();

    返回 ret.thenCompose(
            无效 -> {
              CompletableFuture ret2 = new CompletableFuture();
              ProtocolOptions 协议选项=factory.configuration.getProtocolOptions();
              未来启动ResponseFuture =
                  写(
                      新的请求.Startup(
                          protocolOptions.getCompression(), protocolOptions.isNoCompact()));

              ListenableFuture 通道ReadyFuture =
                  GuavaCompatibility.INSTANCE.transformAsync(
                      启动响应未来,
                      onStartupResponse(协议版本, initExecutor),
                      初始化执行器);

              GuavaCompatibility.INSTANCE.addCallback(
                      通道ReadyFuture,
                      新的 FutureCallback() {
                        @覆盖
                        公共无效onSuccess(无效结果){
                          ret2.complete(null);
                        }

                        @覆盖
                        公共无效onFailure(Throwable t){

                          // 确保连接正确关闭。
                          if (t instanceof ClusterNameMismatchException
                                  || t instanceof UnsupportedProtocolVersionException) {
                            // 只是传播
                            closeAsync().force();
                            ret2.completeExceptionally(t);
                          } 别的 {
                            // 失效以确保发出错误信号(将主机标记为关闭)
                            可投掷 e =
                                    (t实例连接异常
                                            || DriverException 的实例
                                            || t 中断异常实例
                                            || t实例错误)
                                            ?t
                                            : 新的连接异常(
                                            连接.this.endPoint,
                                            字符串. 格式(
                                                    “传输初始化期间出现意外错误 (%s)”,t),
                                            t);
                            ret2.completeExceptionally(defunct(e));
                          }
                          // 确保如果调用者取消返回的 future,连接就会关闭。
                          如果(!isClosed()){
                            closeAsync().force();
                          }


                        }
                      });
              返回ret2;
            });
  }

私人 CompletableFuture createPools(集合主机) {
    列表> futures = Lists.newArrayListWithCapacity(hosts.size());
    对于(主机主机:主机)
      if (host.state != Host.State.DOWN) futures.add(maybeAddPool(host, null));
    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
  }