Mos*_*ses 14 java client-server netty
我想写一个基于netty的客户端.它应该有方法public String send(String msg); 哪个应该从服务器或某个未来返回响应 - doesen't不重要.它也应该是多线程的.像这样:
public class Client {
public static void main(String[] args) throws InterruptedException {
Client client = new Client();
}
private Channel channel;
public Client() throws InterruptedException {
EventLoopGroup loopGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder()).
addLast(new StringEncoder()).
addLast(new ClientHandler());
}
});
channel = b.connect("localhost", 9091).sync().channel();
}
public String sendMessage(String msg) {
channel.writeAndFlush(msg);
return ??????????;
}
Run Code Online (Sandbox Code Playgroud)
}
我不知道如何在调用writeAndFlush()之后从服务器检索响应; 我该怎么办?
我也使用Netty 4.0.18.Final
Fer*_*big 14
返回一个Future<String>方法很简单,我们将实现以下方法签名:
public Futute<String> sendMessage(String msg) {
Run Code Online (Sandbox Code Playgroud)
当您熟悉异步编程结构时,相对容易做到.为了解决设计问题,我们将执行以下步骤:
写入消息时,将a添加Promise<String>到ArrayBlockingQueue<Promise>
这将作为最近发送的消息的列表,并允许我们更改Future<String>对象返回结果.
当一条消息返回到处理程序中时,将其解析为 Queue
这使我们能够获得正确的未来变化.
更新状态 Promise<String>
我们调用promise.setSuccess()最终在对象上设置状态,这将传播回未来的对象.
public class ClientHandler extends SimpleChannelInboundHandler<String> {
private ChannelHandlerContext ctx;
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);
@Override
public void channelActive(ChannelHandlerContext ctx) {
super.channelActive(ctx);
this.ctx = ctx;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
super.channelInactive(ctx);
synchronized(this){
Promise<String> prom;
Exception err = null;
while((prom = messageList.poll()) != null)
prom.setFailure(err != null ? err :
err = new IOException("Connection lost"));
messageList = null;
}
}
public Future<String> sendMessage(String message) {
if(ctx == null)
throw new IllegalStateException();
return sendMessage(message, ctx.executor().newPromise());
}
public Future<String> sendMessage(String message, Promise<String> prom) {
synchronized(this){
if(messageList == null) {
// Connection closed
prom.setFailure(new IllegalStateException());
} else if(messageList.offer(prom)) {
// Connection open and message accepted
ctx.writeAndFlush(message).addListener();
} else {
// Connection open and message rejected
prom.setFailure(new BufferOverflowException());
}
return prom;
}
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) {
synchronized(this){
if(messageList != null) {
messageList.poll().setSuccess(msg);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
private ChannelHandlerContext ctx;
用于存储我们对ChannelHandlerContext的引用,我们使用它来创建promises
private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();
我们将过去的消息保留在此列表中,以便我们可以更改未来的结果
public void channelActive(ChannelHandlerContext ctx)
当连接变为活动状态时由netty调用.在这里初始化变量.
public void channelInactive(ChannelHandlerContext ctx)
当连接变为非活动状态时由netty调用,由于错误或正常连接关闭.
protected void messageReceived(ChannelHandlerContext ctx, String msg)
当新消息到达时由netty调用,这里挑出队列的头部,然后我们调用setsuccess.
当使用期货时,你需要注意一件事,如果未来尚未完成,不要从1个网络线程调用get(),不遵循这个简单规则将导致死锁或a BlockingOperationException.
小智 0
调用channel.writeAndFlush(msg);已经返回一个 ChannelFuture。要处理此方法调用的结果,您可以向 future 添加一个侦听器,如下所示:
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
Run Code Online (Sandbox Code Playgroud)
(这取自 Netty 文档,请参阅:Netty doc)
| 归档时间: |
|
| 查看次数: |
9480 次 |
| 最近记录: |