有没有办法从 Netty 中的 channel.write() 返回自定义承诺?

kun*_*erd 2 netty

我目前正在致力于实现隐私保护数据挖掘算法。对于不同方之间的通信部分,我使用的是Netty 4.0。双方之间的通信流程如下所示:

         -- multiplicationMsg --> ... -- multiplicationMsg -->
   P_{1}                                                       P_{N}
         <-- multiplicationMsg -- ... <-- multiplicationMsg --
Run Code Online (Sandbox Code Playgroud)

哪里P_{1}是发起和控制整个计算的主方。安全多方乘法的逻辑位于 Netty 中ChannelHandler。还有另一个用于安全添加的协议。

目前,我使用了类似的解决方案像这样,从Netty的核心团队由诺曼·毛雷尔表示,如果一个子协议计算已完成得到通知。但这感觉有点像与框架作斗争。

有没有办法从中获得自定义承诺channel.write(msg),它将在ChannelPipeline? 在我上面的例子中,它应该在multiplicationMsg返回时完成P_{1}

编辑 1

这是我通常从外部编写消息时所做的ChannelPipeline

ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
         //do something with the future
    }
});
Run Code Online (Sandbox Code Playgroud)

ChannelFuture f从上面的例子将被满足,如果数据可以被写入到插座或者如果发生故障。但我需要一种方法来获取Future除 之外的自定义ChannelFuture,不知何故:

ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
         // I need something like the following
         if(future.isSuccess()) {
             Future myFuture = future.getMyFuture();
         }
    }
});
Run Code Online (Sandbox Code Playgroud)

小智 5

有很多方法可以做到这一点,这里有一个建立在 netty 之上的例子:

从管道外部,使用IoClient包含ChannelFuture(来自连接初始化)的类(比方说,)内部的公共方法发送消息。该方法看起来像这样:

public MyCustomFuture send(String msg) {
  MyCustomFuture responseFuture = new MyCustomFuture();

  channelFuture.channel().pipeline().get(MyAppClientHandler.class).setResponseFuture(responseFuture);
  channelFuture.channel().writeAndFlush(msg);   

  return responseFuture;
}
Run Code Online (Sandbox Code Playgroud)

MyCustomFuture是我们创建的实现 nettyFuture接口的自定义类,因此它的实例将代理我们的消息。MyAppClientHandler是要实现承诺的 netty 管道(在 中responseFuture),.setResponseFuture(...)并将代理添加到管道中。

根据通道的初始化,channelFuture.channel()可能仍然是null,给我们一个NullPointerException. 因此,我们需要更改上面的代码以从回调中插入代理:

public MyCustomFuture send(final String msg) {
  final MyCustomFuture responseFuture = new MyCustomFuture();

  channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
      channelFuture.channel().pipeline()
          .get(MyAppClientHandler.class).setResponseFuture(responseFuture);
      channelFuture.channel().writeAndFlush(msg);                               
    }
  });

  return responseFuture;
}
Run Code Online (Sandbox Code Playgroud)

还有一件事MyCustomFuture是它需要一个 setter 方法:

public void set(String msg) throws InterruptedException {
  if (state == State.DONE) {
    return;
  }
  blockingReplyHolder.put(msg);
  state = State.DONE;
}
Run Code Online (Sandbox Code Playgroud)

blockingReplyHolder,顾名思义,是实现的字段,它保存实现承诺的消息,如果它仍然不存在则阻止(检查Future

对。现在,当预期的消息到达管道时MyAppClientHandler,我们可以实现如下承诺:

protected void channelRead(ChannelHandlerContext ctx, String msg) throws Exception {
    responseFuture.set(msg);
}
Run Code Online (Sandbox Code Playgroud)

由此产生的自定义 API 的用法将是:

MyCustomFuture future = ioClient.send(message);
// do other stuff if needed
String response = future.get(); // waits if necessary
// make use of the response
Run Code Online (Sandbox Code Playgroud)

这个答案来自我正在玩弄的一个例子