非阻塞套接字

jas*_*ine 11 java sockets network-programming nonblocking blocking

在Java中实现非阻塞套接字的最佳方法是什么?

还是有这样的事情?我有一个通过套接字与服务器通信的程序,但是如果数据/连接出现问题,我不希望套接字调用阻塞/导致延迟.

Teo*_*cci 15

Java 2 Standard Edition 1.4中引入的Java 非阻塞套接字允许应用程序之间的网络通信,而不会阻止使用套接字的进程.但是什么是非阻塞套接字,它在哪些上下文中有用,以及它是如何工作的?

什么是非阻塞套接字?

非阻塞套接字允许在通道上进行I/O操作,而不会阻止使用它的进程.这意味着,我们可以使用单个线程来处理多个并发连接并获得"异步高性能"读/写操作(有些人可能不同意)

好的,在哪些情况下它可能有用?

假设您希望实现接受不同客户端连接的服务器.同时假设您希望服务器能够同时处理多个请求.使用传统方式,您有两种选择来开发这样的服务器:

  • 实现一个多线程服务器,为每个连接手动处理一个线程.
  • 使用外部第三方模块.

这两种解决方案都有效,但是采用第一种解决方案来开发整个线程管理解决方案,具有相关的并发性和冲突问题.第二种解决方案使应用程序依赖于非JDK外部模块,可能您必须使库适应您的需求.通过非阻塞套接字,您可以实现非阻塞服务器,而无需直接管理线程或使用外部模块.

这个怎么运作?

在详细介绍之前,您需要了解的术语很少:

  • 在基于NIO的实现中,我们不是将数据写入输出流并从输入流中读取数据,而是从缓冲区读取和写入数据.甲缓冲器可以被定义为一个临时存储.
  • 通道将大量数据传入和传出缓冲区.此外,它可以被视为通信的端点.
  • 准备选择是一个概念,它指的是"选择在读取或写入数据时不会阻塞的套接字的能力".

Java NIO有一个名为的类Selector,允许单个线程检查多个通道上的I/O事件.这怎么可能?好吧,selector可以检查通道的"准备就绪",例如客户端尝试连接或读/写操作.也就是说,每个实例都Selector可以监视更多的套接字通道,从而监控更多连接.现在,当通道上发生某些事件(发生事件)时,selector通知应用程序处理请求.的selector实现方法是创建它的事件的键(或选择键),其为实例SelectionKey的类.每个都key保存有关谁发出请求以及请求的类型的信息,如图1所示.

图1:结构图 图1:结构图

一个基本的实现

服务器实现由无限循环组成,其中selector等待事件并创建事件密钥.密钥有四种可能的类型:

  • 可接受:关联的客户端请求连接.
  • 可连接:服务器接受连接.
  • 可读:服务器可以读取.
  • 可写:服务器可以写.

通常acceptable在服务器端创建密钥.实际上,这种密钥只是简单地通知服务器客户端需要连接,然后服务器将套接字通道个性化并将其与选择器相关联以进行读/写操作.在此之后,当接受的客户端读取或写入某些内容时,选择器将为该客户端创建readablewriteable键.

现在,您已准备好按照提议的算法用Java编写服务器.可以通过以下方式创建套接字通道,selector套接字选择器注册:

final String HOSTNAME = "127.0.0.1";
final int PORT = 8511;

// This is how you open a ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// You MUST configure as non-blocking or else you cannot register the serverChannel to the Selector.
serverChannel.configureBlocking(false);
// bind to the address that you will use to Serve.
serverChannel.socket().bind(new InetSocketAddress(HOSTNAME, PORT));

// This is how you open a Selector
selector = Selector.open();
/*
 * Here you are registering the serverSocketChannel to accept connection, thus the OP_ACCEPT.
 * This means that you just told your selector that this channel will be used to accept connections.
 * We can change this operation later to read/write, more on this later.
 */
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
Run Code Online (Sandbox Code Playgroud)

首先,我们创建SocketChannelwith ServerSocketChannel.open()方法的实例.接下来,configureBlocking(false)调用将其设置channel 为非阻塞.与服务器的连接是通过serverChannel.socket().bind()方法建立的.它HOSTNAME代表服务器的IP地址,PORT是通信端口.最后,调用Selector.open()方法来创建selector实例并将其注册到channel和注册类型.在此示例中,注册类型是OP_ACCEPT,这意味着选择器仅报告客户端尝试连接到服务器.其他可能的选择是:OP_CONNECT,将由客户使用; OP_READ; 和OP_WRITE.

现在我们需要使用无限循环来处理这些请求.一个简单的方法如下:

// Run the server as long as the thread is not interrupted.
while (!Thread.currentThread().isInterrupted()) {
    /*
     * selector.select(TIMEOUT) is waiting for an OPERATION to be ready and is a blocking call.
     * For example, if a client connects right this second, then it will break from the select()
     * call and run the code below it. The TIMEOUT is not needed, but its just so it doesn't
     * block undefinable.
     */
    selector.select(TIMEOUT);

    /*
     * If we are here, it is because an operation happened (or the TIMEOUT expired).
     * We need to get the SelectionKeys from the selector to see what operations are available.
     * We use an iterator for this.
     */
    Iterator<SelectionKey> keys = selector.selectedKeys().iterator();

    while (keys.hasNext()) {
        SelectionKey key = keys.next();
        // remove the key so that we don't process this OPERATION again.
        keys.remove();

        // key could be invalid if for example, the client closed the connection.
        if (!key.isValid()) {
            continue;
        }
        /*
         * In the server, we start by listening to the OP_ACCEPT when we register with the Selector.
         * If the key from the keyset is Acceptable, then we must get ready to accept the client
         * connection and do something with it. Go read the comments in the accept method.
         */
        if (key.isAcceptable()) {
            System.out.println("Accepting connection");
            accept(key);
        }
        /*
         * If you already read the comments in the accept() method, then you know we changed
         * the OPERATION to OP_WRITE. This means that one of these keys in the iterator will return
         * a channel that is writable (key.isWritable()). The write() method will explain further.
         */
        if (key.isWritable()) {
            System.out.println("Writing...");
            write(key);
        }
        /*
         * If you already read the comments in the write method then you understand that we registered
         * the OPERATION OP_READ. That means that on the next Selector.select(), there is probably a key
         * that is ready to read (key.isReadable()). The read() method will explain further.
         */
        if (key.isReadable()) {
            System.out.println("Reading connection");
            read(key);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

您可以在此处找到实施源

注意:异步服务器

作为非阻塞实现的替代方案,我们可以部署异步服务器.例如,您可以使用AsynchronousServerSocketChannel该类,该类为面向流的侦听套接字提供异步通道.

要使用它,首先执行其静态open()方法,然后执行bind()到特定端口.接下来,您将执行其accept()方法,并向其传递一个实现该CompletionHandler接口的类.通常,您会发现将处理程序创建为匿名内部类.

从这个AsynchronousServerSocketChannel对象,你调用accept()它来告诉它开始侦听连接,并向它传递一个自定义CompletionHandler实例.当我们调用时accept(),它立即返回.请注意,这与传统的阻塞方法不同; 虽然accept()方法被阻塞,直到客户端连接到它,该AsynchronousServerSocketChannel accept()方法为您处理它.

这里有一个例子:

public class NioSocketServer
{
    public NioSocketServer()
    {
        try {
            // Create an AsynchronousServerSocketChannel that will listen on port 5000
            final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel
                    .open()
                    .bind(new InetSocketAddress(5000));

            // Listen for a new request
            listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>()
            {
                @Override
                public void completed(AsynchronousSocketChannel ch, Void att)
                {
                    // Accept the next connection
                    listener.accept(null, this);

                    // Greet the client
                    ch.write(ByteBuffer.wrap("Hello, I am Echo Server 2020, let's have an engaging conversation!\n".getBytes()));

                    // Allocate a byte buffer (4K) to read from the client
                    ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
                    try {
                        // Read the first line
                        int bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);

                        boolean running = true;
                        while (bytesRead != -1 && running) {
                            System.out.println("bytes read: " + bytesRead);

                            // Make sure that we have data to read
                            if (byteBuffer.position() > 2) {
                                // Make the buffer ready to read
                                byteBuffer.flip();

                                // Convert the buffer into a line
                                byte[] lineBytes = new byte[bytesRead];
                                byteBuffer.get(lineBytes, 0, bytesRead);
                                String line = new String(lineBytes);

                                // Debug
                                System.out.println("Message: " + line);

                                // Echo back to the caller
                                ch.write(ByteBuffer.wrap(line.getBytes()));

                                // Make the buffer ready to write
                                byteBuffer.clear();

                                // Read the next line
                                bytesRead = ch.read(byteBuffer).get(20, TimeUnit.SECONDS);
                            } else {
                                // An empty line signifies the end of the conversation in our protocol
                                running = false;
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        // The user exceeded the 20 second timeout, so close the connection
                        ch.write(ByteBuffer.wrap("Good Bye\n".getBytes()));
                        System.out.println("Connection timed out, closing connection");
                    }

                    System.out.println("End of conversation");
                    try {
                        // Close the connection if we need to
                        if (ch.isOpen()) {
                            ch.close();
                        }
                    } catch (I/OException e1)
                    {
                        e1.printStackTrace();
                    }
                }

                @Override
                public void failed(Throwable exc, Void att)
                {
                    ///...
                }
            });
        } catch (I/OException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)
    {
        NioSocketServer server = new NioSocketServer();
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

你可以在这里找到完整的代码


use*_*421 6

其中一些答案是不正确的.SocketChannel.configureBlocking(false)将其置于非阻塞模式.您不需要选择器来执行此操作.您只需要一个Selector来实现超时或带有非阻塞套接字的多路复用 I/O.


sec*_*ask -1

java.nio 包提供了Selector,其工作方式与 C 中非常相似。