如何知道Netty ByteBuf中是否没有数据可读?

Lei*_*ong 6 java netty

我是Netty的新手.文件传输存在一个问题让我困惑了好几天.我想将图像文件从客户端发送到服务器.

以下代码是可执行的.但只有我强行关闭服务器才能正常打开收到的图像文件.否则,它显示" 看起来您没有权限查看此文件.请检查权限并重试 ".因此,当ByteBuf中没有数据使用ByteBuf.isReadable()时,我想关闭fileoutputstream ,但ServerHandler中方法channelRead中的else块永远不会到达.这毫无用处.

此外,如果发送的文本文件,它可以正常开启时,服务器是活着的.我不希望每次转移后都关闭服务器.请给我一些解决方案的建议.

这是FileClientHandler

public class FileClientHandler extends ChannelInboundHandlerAdapter {

private int readLength = 8;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    sendFile(ctx.channel());
}

private void sendFile(Channel channel) throws IOException {
    File file = new File("C:\\Users\\xxx\\Desktop\\1.png");
    FileInputStream fis = new FileInputStream(file);
    BufferedInputStream bis = new BufferedInputStream(fis);

    for (;;) {
        byte[] bytes = new byte[readLength];
        int readNum = bis.read(bytes, 0, readLength);
        // System.out.println(readNum);
        if (readNum == -1) {
            bis.close();
            fis.close();
            return;
        }
        sendToServer(bytes, channel, readNum);
    }
}

private void sendToServer(byte[] bytes, Channel channel, int length)
        throws IOException {
    channel.writeAndFlush(Unpooled.copiedBuffer(bytes, 0, length));
}

}
Run Code Online (Sandbox Code Playgroud)

这是FileServerHandler

public class FileServerHandler extends ChannelInboundHandlerAdapter {

private File file = new File("C:\\Users\\xxx\\Desktop\\2.png");
private FileOutputStream fos;

public FileServerHandler() {
    try {
        if (!file.exists()) {
            file.createNewFile();
        } else {
            file.delete();
            file.createNewFile();
        }
        fos = new FileOutputStream(file);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    try {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isReadable()) {
            buf.readBytes(fos, buf.readableBytes());
            fos.flush();
        } else {
            System.out.println("I want to close fileoutputstream!");
            buf.release();
            fos.flush();
            fos.close();
        }

    } catch (Exception e) {
        e.printStackTrace();
    }
}
}
Run Code Online (Sandbox Code Playgroud)

Fer*_*big 3

修复服务器端

在Netty世界中,有多个“事件”:

在这些“事件”中,您可能已经知道它们的channelRead作用(自从您使用它以来),但您似乎需要的另一个是channelInactive. 当另一个端点关闭连接时会调用此函数,您可以像这样使用它:

@Override
public void channelInactive(ctx) {
    System.out.println("I want to close fileoutputstream!");
    fos.close();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    try {
        ByteBuf buf = (ByteBuf) msg;
        // if (buf.isReadable()) { // a buf should always be readable here
            buf.readBytes(fos, buf.readableBytes());
        //    fos.flush(); // flushing is always done when closing
        //} else {
        //    System.out.println("I want to close fileoutputstream!");
        //    buf.release(); // Should be placed in the finally block
        //    fos.flush();
        //    fos.close();
        //}
    } catch (Exception e) {
         e.printStackTrace();
    } finally {
         buf.release(); // Should always be done, even if writing to the file fails
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,服务器如何知道连接已关闭?目前,客户端不会关闭服务器,而是继续在后台运行,永远保持连接处于活动状态。

修复客户端

为了正确关闭客户端的连接,我们需要调用channel.close(),但是,我们不能直接在返回行之前插入它,因为这会导致发送数据和关闭网络层连接之间的竞争条件,可能会丢失数据。

为了正确处理这些情况,Netty 使用了一个Future系统,该系统允许代码在异步操作发生后处理事件。

幸运的是,Netty 已经为此提供了内置解决方案,我们只需将其连接起来即可。为了将此解决方案连接到我们的代码,我们必须跟踪ChannelFutureNetty 的 write 方法发出的最新消息。

为了正确实现这个解决方案,我们更改sendToServer为返回 write 方法的结果:

private ChannelFuture sendToServer(byte[] bytes, Channel channel, int length)
        throws IOException {
    return channel.writeAndFlush(Unpooled.copiedBuffer(bytes, 0, length));
}
Run Code Online (Sandbox Code Playgroud)

然后我们跟踪这个返回值,并在我们想要关闭连接时添加一个包含 Netty 内置解决方案的监听器:

ChannelFuture lastFuture = null;
for (;;) {
    byte[] bytes = new byte[readLength];
    int readNum = bis.read(bytes, 0, readLength);
    // System.out.println(readNum);
    if (readNum == -1) {
        bis.close();
        fis.close();
        if(lastFuture == null) { // When our file is 0 bytes long, this is true
            channel.close();
        } else {
            lastFuture.addListener(ChannelFutureListener.CLOSE);
        }
        return;
    }
    lastFuture = sendToServer(bytes, channel, readNum);
}
Run Code Online (Sandbox Code Playgroud)