Java 非阻塞 IO 选择器导致通道寄存器阻塞

use*_*301 5 java sockets nonblocking

我有两个线程正在处理用于非阻塞套接字的 Java NIO。这是线程正在做的事情:

线程 1:调用选择器的 select() 方法的循环。如果有任何密钥可用,则相应地处理它们。

线程 2:偶尔会通过调用 register() 向选择器注册一个 SocketChannel。

问题是,除非 select() 的超时时间非常小(比如大约 100 毫秒),否则对 register() 的调用将无限期阻塞。即使通道被配置为非阻塞,并且 javadocs 声明 Selector 对象是线程安全的(但它的选择键不是,我知道)。

所以有人对这个问题有什么想法吗?如果我将所有内容都放在一个线程中,该应用程序就可以完美运行。没有问题发生,但我真的很想有单独的线程。任何帮助表示赞赏。我在下面发布了我的示例代码:

将 select(1000) 更改为 select(100) 它将起作用。将其保留为 select() 或 select(1000) ,它不会。


import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();

public static void init() { initialized = true;

try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); }

Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); }

public static void shutdown() { initialized = false; }

private static void readData() { try { int numKeys = recvSelector.select(1000);

if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator();

while(i.hasNext())
{
 SelectionKey key = i.next();
 i.remove();

 if (key.isValid() && key.isReadable())
 {
  DatagramChannel channel = (DatagramChannel) key.channel();

  // allocate every time we receive so that it's a copy that won't get erased
  final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
  channel.receive(buffer);
  buffer.flip();
  final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();

  // let user handle event on a dedicated thread
  eventQueue.execute(new Runnable()
  {
   @Override
   public void run() 
   {
    subscriber.onData(buffer);
   }       
  });
 }
}
Run Code Online (Sandbox Code Playgroud)

} } catch (IOException e) { System.err.println(e); }
}

public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } }

public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } }

public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } }

public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }

Run Code Online (Sandbox Code Playgroud)


import java.nio.ByteBuffer;

public interface SocketSubscriber { public void onData(ByteBuffer data); }

Run Code Online (Sandbox Code Playgroud)

用法示例:


public class Test implements SocketSubscriber
{
 public static void main(String[] args) throws Exception
 {
  UDPSocket.init();
  UDPSocket test = new UDPSocket("localhost", 1234);
  test.addListener(new Test());
  UDPSocket test2 = new UDPSocket("localhost", 4321);
  test2.addListener(new Test());
  System.out.println("Listening...");
  ByteBuffer buffer = ByteBuffer.allocate(500);
  test.send(buffer);
  buffer.rewind();
  test2.send(buffer);
  System.out.println("Data sent...");
  Thread.sleep(5000);
  UDPSocket.shutdown();
 }

@Override public void onData(ByteBuffer data) { System.out.println("Received " + data.limit() + " bytes of data."); } }

Run Code Online (Sandbox Code Playgroud)

use*_*421 6

选择器有多个已记录的内部同步级别,您正在遇到所有这些级别。wakeup()在调用之前调用选择器确保register().循环select()正常工作,如果有零个选定的键,这将发生在wakeup().

  • 我遇到了与海报相同的问题,但我对这个解决方案并不完全满意。由于线程调度,在寄存器发生之前没有什么可以阻止唤醒和重新选择。 (5认同)
  • @DavidEhrmann如果你可以控制调用`select()`的线程的代码,你可以实现某种控制来防止立即重新选择,丑陋的例如`if (select()==0) sleep(1);`可能会为“wakeup()”提供足够的产量;register();` 在下一个 `select()` 发生之前执行。解决方案仍然有效(+1),问题来自于“缺少方法”“wakeupAndRegister()”,该方法将自动执行。 (4认同)
  • @DavidEhrmann 我对整个设计不满意。为什么三个级别的同步被“指定”发生,并硬连接到抽象基类中,这对我来说是一个谜,这就是为什么除了这个奇怪的“唤醒(”之外,从单独线程注册的机制几乎完全留给我们了) )`方法。 (3认同)
  • 由于该问题可能导致死锁,因此执行“sleep(1)”之类的操作是一个非常糟糕的主意。多线程不能做得“好”或“坏”……只有“对”或“不对”。无论如何,每次“睡眠”的出现都可能是坏事。 (2认同)