如何使用netty客户端获取服务器响应

Mos*_*ses 14 java client-server netty

我想写一个基于netty的客户端.它应该有方法public String send(String msg); 哪个应该从服务器或某个未来返回响应 - doesen't不重要.它也应该是多线程的.像这样:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}
Run Code Online (Sandbox Code Playgroud)

}

我不知道如何在调用writeAndFlush()之后从服务器检索响应; 我该怎么办?

我也使用Netty 4.0.18.Final

Fer*_*big 14

返回一个Future<String>方法很简单,我们将实现以下方法签名:

public Futute<String> sendMessage(String msg) {
Run Code Online (Sandbox Code Playgroud)

当您熟悉异步编程结构时,相对容易做到.为了解决设计问题,我们将执行以下步骤:

  1. 写入消息时,将a添加Promise<String>ArrayBlockingQueue<Promise>

    这将作为最近发送的消息的列表,并允许我们更改Future<String>对象返回结果.

  2. 当一条消息返回到处理程序中时,将其解析为 Queue

    这使我们能够获得正确的未来变化.

  3. 更新状态 Promise<String>

    我们调用promise.setSuccess()最终在对象上设置状态,这将传播回未来的对象.

示例代码

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    private ChannelHandlerContext ctx;
    private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        super.channelActive(ctx);
        this.ctx = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        super.channelInactive(ctx);
        synchronized(this){
            Promise<String> prom;
            Exception err = null;
            while((prom = messageList.poll()) != null) 
                prom.setFailure(err != null ? err : 
                    err = new IOException("Connection lost"));
            messageList = null;
        }
    }

    public Future<String> sendMessage(String message) {
        if(ctx == null) 
            throw new IllegalStateException();
        return sendMessage(message, ctx.executor().newPromise());
    }

    public Future<String> sendMessage(String message, Promise<String> prom) {
        synchronized(this){
            if(messageList == null) {
                // Connection closed
                prom.setFailure(new IllegalStateException());
            } else if(messageList.offer(prom)) { 
                // Connection open and message accepted
                ctx.writeAndFlush(message).addListener();
            } else { 
                // Connection open and message rejected
                prom.setFailure(new BufferOverflowException());
            }
            return prom;
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) {
        synchronized(this){
            if(messageList != null) {
                 messageList.poll().setSuccess(msg);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

文档细分

  • private ChannelHandlerContext ctx;

    用于存储我们对ChannelHandlerContext的引用,我们使用它来创建promises

  • private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();

    我们将过去的消息保留在此列表中,以便我们可以更改未来的结果

  • public void channelActive(ChannelHandlerContext ctx)

    当连接变为活动状态时由netty调用.在这里初始化变量.

  • public void channelInactive(ChannelHandlerContext ctx)

    当连接变为非活动状态时由netty调用,由于错误或正常连接关闭.

  • protected void messageReceived(ChannelHandlerContext ctx, String msg)

    当新消息到达时由netty调用,这里挑出队列的头部,然后我们调用setsuccess.

警告提醒

当使用期货时,你需要注意一件事,如果未来尚未完成,不要从1个网络线程调用get(),不遵循这个简单规则将导致死锁或a BlockingOperationException.

  • 对此有两点需要注意:1)所使用的协议必须保证服务器按照接收请求的顺序发送响应,2)请求仅发送到单个服务器并从单个服务器接收(否则1将自行断开)各种服务器之间的排序可能不再是同步的.由于可以使用单个Bootstrap连接到多个服务器,所以第二个可能是一个问题,尽管每个连接将产生自己的通道,因此应该可以为每个通道设置单独的队列来解决这个假设(1)确实成立. (4认同)

小智 0

调用channel.writeAndFlush(msg);已经返回一个 ChannelFuture。要处理此方法调用的结果,您可以向 future 添加一个侦听器,如下所示:

future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
        // Perform post-closure operation
        // ...
    }
}); 
Run Code Online (Sandbox Code Playgroud)

(这取自 Netty 文档,请参阅:Netty doc

  • 但是如何从 ChannelFuture 获取服务器响应呢? (3认同)