AsynchronousSocketChannel 如何保持读取?

fly*_*ell 5 java nio2 java-nio

我需要连接到 4 台机器并从套接字读取数据。我选择使用 nio2 的异步模型。

这是一个伪代码:

class Connector {

    private final AsynchronousChannelGroup group;
    private final String host;
    private final int port
    //...
    
    public void connect() {
        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {

            client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);

            if (!group.isShutdown()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
            }

        } catch (Exception e) {
            //
        }
    }

    private class ReadHandler implements CompletionHandler<Integer, Attachement> {

        final AsynchronousSocketChannel channel;

        @Override
        public void completed(Integer result, Attachement attachment) {
            attachment.buffer.clear();
            channel.read(attachment.buffer, 5, TimeUnit.SECONDS, attachment, this);
        }

        @Override
        public void failed(Throwable exc, Attachement attachment) {
            closeChannel();
        }

        void closeChannel() {
            try {
                if (channel.isOpen()) {
                    channel.close();
                }
            } catch (IOException e) {
                // do something else
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

要运行此代码,我创建了一个新组并将其传递给所有Connectors :

ExecutorService executor = Executors.newFixedThreadPool(4);
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(executor);

//for each server
Connector connectorX = new Connector(group, serverX.host, serverX.port);
Run Code Online (Sandbox Code Playgroud)

问题:

第一次阅读后(您可以在其中找到//#1评论)释放线程。

解决方案 !

为了解决这个问题,我引入了一个CountDownLatch让线程等待阅读的方法:


//...
private CountDownLatch latch; 

public void connect() {

        latch = new CountDownLatch(1);

        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group)) {

            client.connect(new InetSocketAddress(host(), port())).get(5, TimeUnit.SECONDS);

            if (!group.isShutdown()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                client.read(buffer, 5, TimeUnit.SECONDS, new Attachement(buffer), new ReadHandler(client)); //#1
            }
            
            latch.await();

        } catch (Exception e) {
            //...
            latch.countDown();
        }
}
Run Code Online (Sandbox Code Playgroud)

题:

这是 ( CountDownLatch) 解决问题的正确方法吗?如果没有,最好的方法是什么?

阻塞线程( latch.await()) 是不是与异步模型矛盾?