我目前正在致力于实现隐私保护数据挖掘算法。对于不同方之间的通信部分,我使用的是Netty 4.0。双方之间的通信流程如下所示:
-- multiplicationMsg --> ... -- multiplicationMsg -->
P_{1} P_{N}
<-- multiplicationMsg -- ... <-- multiplicationMsg --
Run Code Online (Sandbox Code Playgroud)
哪里P_{1}是发起和控制整个计算的主方。安全多方乘法的逻辑位于 Netty 中ChannelHandler。还有另一个用于安全添加的协议。
目前,我使用了类似的解决方案像这样,从Netty的核心团队由诺曼·毛雷尔表示,如果一个子协议计算已完成得到通知。但这感觉有点像与框架作斗争。
有没有办法从中获得自定义承诺channel.write(msg),它将在ChannelPipeline? 在我上面的例子中,它应该在multiplicationMsg返回时完成P_{1}。
编辑 1
这是我通常从外部编写消息时所做的ChannelPipeline:
ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
//do something with the future
}
});
Run Code Online (Sandbox Code Playgroud)
在ChannelFuture f从上面的例子将被满足,如果数据可以被写入到插座或者如果发生故障。但我需要一种方法来获取Future除 之外的自定义ChannelFuture,不知何故:
ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// I need something like the following
if(future.isSuccess()) {
Future myFuture = future.getMyFuture();
}
}
});
Run Code Online (Sandbox Code Playgroud)
小智 5
有很多方法可以做到这一点,这里有一个建立在 netty 之上的例子:
从管道外部,使用IoClient包含ChannelFuture(来自连接初始化)的类(比方说,)内部的公共方法发送消息。该方法看起来像这样:
public MyCustomFuture send(String msg) {
MyCustomFuture responseFuture = new MyCustomFuture();
channelFuture.channel().pipeline().get(MyAppClientHandler.class).setResponseFuture(responseFuture);
channelFuture.channel().writeAndFlush(msg);
return responseFuture;
}
Run Code Online (Sandbox Code Playgroud)
MyCustomFuture是我们创建的实现 nettyFuture接口的自定义类,因此它的实例将代理我们的消息。MyAppClientHandler是要实现承诺的 netty 管道(在 中responseFuture),.setResponseFuture(...)并将代理添加到管道中。
根据通道的初始化,channelFuture.channel()可能仍然是null,给我们一个NullPointerException. 因此,我们需要更改上面的代码以从回调中插入代理:
public MyCustomFuture send(final String msg) {
final MyCustomFuture responseFuture = new MyCustomFuture();
channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channelFuture.channel().pipeline()
.get(MyAppClientHandler.class).setResponseFuture(responseFuture);
channelFuture.channel().writeAndFlush(msg);
}
});
return responseFuture;
}
Run Code Online (Sandbox Code Playgroud)
还有一件事MyCustomFuture是它需要一个 setter 方法:
public void set(String msg) throws InterruptedException {
if (state == State.DONE) {
return;
}
blockingReplyHolder.put(msg);
state = State.DONE;
}
Run Code Online (Sandbox Code Playgroud)
blockingReplyHolder,顾名思义,是实现的字段,它保存实现承诺的消息,如果它仍然不存在则阻止(检查Future)
对。现在,当预期的消息到达管道时MyAppClientHandler,我们可以实现如下承诺:
protected void channelRead(ChannelHandlerContext ctx, String msg) throws Exception {
responseFuture.set(msg);
}
Run Code Online (Sandbox Code Playgroud)
由此产生的自定义 API 的用法将是:
MyCustomFuture future = ioClient.send(message);
// do other stuff if needed
String response = future.get(); // waits if necessary
// make use of the response
Run Code Online (Sandbox Code Playgroud)
这个答案来自我正在玩弄的一个例子。
| 归档时间: |
|
| 查看次数: |
2405 次 |
| 最近记录: |